1

Ich habe versucht, NiFi Flowfile Attribute von Kafka Nachricht in Spark Streaming zugreifen. Ich benutze Java als Sprache.NiFi Flowfile Attribute von KafkaConsumer

Das Szenario ist, dass NiFI Binärdateien von FTP-Speicherort mit GetSFTP-Prozessor liest und Byte [] Nachrichten an Kafka publishKafka Prozessor veröffentlicht. Diese byte [] - Attribute werden mithilfe des Spark-Streaming-Auftrags in ASCII-Daten konvertiert. Diese decodierten ASCII-Daten werden zur weiteren Verarbeitung an Kafka gesendet und auf dem NiFi-Prozessor in HDFS gespeichert.

Mein Problem ist, dass ich binären Dateinamen und dekodierte ASCII-Datei nicht verfolgen kann. Ich muss einen Header-Abschnitt (für Dateiname, Dateigröße, Anzahl der Datensätze usw.) in meinem decodierten ASCII-Code hinzufügen, aber es ist mir nicht gelungen herauszufinden, wie der Dateiname von NiFi Flowfile vom KafkaConsumer-Objekt abgerufen werden kann. Gibt es eine Möglichkeit, dass ich dies mit Standard-NiFi-Prozessoren machen kann? Oder teilen Sie uns bitte weitere Vorschläge mit, um diese Funktionalität zu erreichen. Vielen Dank.

Antwort

1

So Ihr Datenfluss:

FTP -> Nifi -> Kafka -> Spark-Streaming -> Kafka -> Nifi -> HDFS ?

Momentan hat Kafka keine Metadatenattribute für jede Nachricht (obwohl ich glaube, dass dies in Kafka 0.11 vorkommen könnte). Wenn NiFi also eine Nachricht zu einem Thema veröffentlicht, kann es die Attribute der Datenflussdatei nicht mit übergeben die Nachricht.

Sie müssten eine Art Wrapper-Datenformat (möglicherweise JSON oder Avro) erstellen, das den ursprünglichen Inhalt + die zusätzlichen Attribute enthält, die Sie benötigen, damit Sie das Ganze als Inhalt einer Nachricht an Kafka veröffentlichen können.

Außerdem weiß ich nicht genau, was Sie in Ihrem Spark-Streaming-Job machen, aber gibt es einen Grund, warum Sie nicht einfach diesen Teil in NiFi machen können? Es klingt nicht wie etwas Komplexes mit Fenstern oder Joins, also könntest du die Dinge ein wenig vereinfachen und NiFi die Decodierung machen lassen, dann musst du es Kafka und HDFS schreiben.

+0

Ja, das ist der Datenfluss. Ich hoffe, dass Metadatenattribute in Kafka 0.11 verfügbar sind. Ich werde prüfen, Daten zu verpacken. Bezüglich des Spark-Streaming-Jobs führe ich eine Fensterung durch, um einige Ereignisse auszulösen. Ich bin neu in NiFi, kannst du mir bitte erklären, wie ich mein Byte [] in NiFi verpacken kann? – Shahzad

+0

Der einfachste Weg besteht darin, ein Groovy- oder Jython-Skript zu schreiben und es über den ExecuteScript-Prozessor auszuführen. Es ist nicht schön, aber vielleicht können Sie in der Flow-Datei Bytes lesen und Base64 kodieren sie dann ein JSON-Dokument wie {"Daten": , "Dateiname": "The-Flow-Dateiname"} –

+0

https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html –

Verwandte Themen