Ich habe versucht, NiFi Flowfile Attribute von Kafka Nachricht in Spark Streaming zugreifen. Ich benutze Java als Sprache.NiFi Flowfile Attribute von KafkaConsumer
Das Szenario ist, dass NiFI Binärdateien von FTP-Speicherort mit GetSFTP-Prozessor liest und Byte [] Nachrichten an Kafka publishKafka Prozessor veröffentlicht. Diese byte [] - Attribute werden mithilfe des Spark-Streaming-Auftrags in ASCII-Daten konvertiert. Diese decodierten ASCII-Daten werden zur weiteren Verarbeitung an Kafka gesendet und auf dem NiFi-Prozessor in HDFS gespeichert.
Mein Problem ist, dass ich binären Dateinamen und dekodierte ASCII-Datei nicht verfolgen kann. Ich muss einen Header-Abschnitt (für Dateiname, Dateigröße, Anzahl der Datensätze usw.) in meinem decodierten ASCII-Code hinzufügen, aber es ist mir nicht gelungen herauszufinden, wie der Dateiname von NiFi Flowfile vom KafkaConsumer-Objekt abgerufen werden kann. Gibt es eine Möglichkeit, dass ich dies mit Standard-NiFi-Prozessoren machen kann? Oder teilen Sie uns bitte weitere Vorschläge mit, um diese Funktionalität zu erreichen. Vielen Dank.
Ja, das ist der Datenfluss. Ich hoffe, dass Metadatenattribute in Kafka 0.11 verfügbar sind. Ich werde prüfen, Daten zu verpacken. Bezüglich des Spark-Streaming-Jobs führe ich eine Fensterung durch, um einige Ereignisse auszulösen. Ich bin neu in NiFi, kannst du mir bitte erklären, wie ich mein Byte [] in NiFi verpacken kann? – Shahzad
Der einfachste Weg besteht darin, ein Groovy- oder Jython-Skript zu schreiben und es über den ExecuteScript-Prozessor auszuführen. Es ist nicht schön, aber vielleicht können Sie in der Flow-Datei Bytes lesen und Base64 kodieren sie dann ein JSON-Dokument wie {"Daten":, "Dateiname": "The-Flow-Dateiname"} –
https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html –