2016-02-02 6 views
7

Ich möchte Flink so einrichten, dass es die Datenströme von Apache Kafka zu MongoDB transformiert und umleitet. Zu Testzwecken baue ich oben auf flink-streaming-connectors.kafka als Beispiel (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB

Kafka-Streams werden von Flink ordnungsgemäß rot, ich kann sie zuordnen usw., aber das Problem tritt auf, wenn ich jede empfangene und umgewandelte Nachricht in MongoDB speichern möchte. Das einzige Beispiel, das ich über MongoDB Integration gefunden habe, ist flink-mongodb-test von github. Leider verwendet es statische Datenquelle (Datenbank), nicht den Datenstrom.

Ich glaube, es sollte einige DataStream.addSink-Implementierung für MongoDB sein, aber anscheinend gibt es nicht.

Was wäre der beste Weg, um es zu erreichen? Muss ich die benutzerdefinierte Sink-Funktion schreiben oder fehlt mir vielleicht etwas? Vielleicht sollte es anders gemacht werden?

Ich bin nicht an irgendeine Lösung gebunden, so dass jeder Vorschlag geschätzt werden würde.

Unten gibt es ein Beispiel was genau ich als Eingabe bekomme und was ich als Ausgabe speichern muss.

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String) 
Apache Kafka Broker --------------> Flink: DataStream<String> 

Flink: DataStream.map({ 
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD") 
}) 
.rebalance() 
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection 

Wie Sie in diesem Beispiel sehen kann ich Flink bin mit meist für Kafkas Nachrichtenstrom Pufferung und einige grundlegende Parsing.

Antwort

3

Es ist derzeit keine Streaming MongoDB-Senke in Flink verfügbar.

Allerdings gibt es zwei Möglichkeiten, um Daten in MongoDB Schreiben:

  • Verwenden Sie den DataStream.write() Aufruf von Flink. Es ermöglicht Ihnen, ein beliebiges OutputFormat (aus der Batch-API) mit Streaming zu verwenden. Mit dem HadoopOutputFormatWrapper von Flink können Sie den offiziellen MongoDB Hadoop-Konnektor

  • Implementieren Sie den Sink selbst. Die Implementierung von Senken ist mit der Streaming-API recht einfach, und ich bin sicher, dass MongoDB über eine gute Java-Client-Bibliothek verfügt.

Beide Ansätze bieten keine anspruchsvollen Verarbeitungsgarantien. Wenn Sie jedoch Flink mit Kafka verwenden (und Checkpointing aktiviert), haben Sie mindestens eine Semantik: In einem Fehlerfall werden die Daten erneut in die MongoDB-Senke gestreamt. Wenn Sie idempotente Aktualisierungen durchführen, sollte das Wiederholen dieser Aktualisierungen keine Inkonsistenzen verursachen.

Wenn Sie wirklich genau einmal Semantik für MongoDB benötigen, sollten Sie wahrscheinlich eine JIRA in Flink Datei einreichen und mit der Community diskutieren, wie dies zu implementieren ist.

2

Als Alternative zur Antwort von Robert Metzger können Sie Ihre Ergebnisse erneut an Kafka schreiben und dann einen der gepflegten Kafka-Konnektoren verwenden, um den Inhalt eines Themas in Ihrer MongoDB-Datenbank zu löschen.

Kafka -> Flink -> Kafka -> Mongo/Alles

Mit diesem Ansatz können Sie den "at-least-once Semantik" behaivour mantain.

Verwandte Themen