Ich möchte Flink so einrichten, dass es die Datenströme von Apache Kafka zu MongoDB transformiert und umleitet. Zu Testzwecken baue ich oben auf flink-streaming-connectors.kafka als Beispiel (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB
Kafka-Streams werden von Flink ordnungsgemäß rot, ich kann sie zuordnen usw., aber das Problem tritt auf, wenn ich jede empfangene und umgewandelte Nachricht in MongoDB speichern möchte. Das einzige Beispiel, das ich über MongoDB Integration gefunden habe, ist flink-mongodb-test von github. Leider verwendet es statische Datenquelle (Datenbank), nicht den Datenstrom.
Ich glaube, es sollte einige DataStream.addSink-Implementierung für MongoDB sein, aber anscheinend gibt es nicht.
Was wäre der beste Weg, um es zu erreichen? Muss ich die benutzerdefinierte Sink-Funktion schreiben oder fehlt mir vielleicht etwas? Vielleicht sollte es anders gemacht werden?
Ich bin nicht an irgendeine Lösung gebunden, so dass jeder Vorschlag geschätzt werden würde.
Unten gibt es ein Beispiel was genau ich als Eingabe bekomme und was ich als Ausgabe speichern muss.
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
Wie Sie in diesem Beispiel sehen kann ich Flink bin mit meist für Kafkas Nachrichtenstrom Pufferung und einige grundlegende Parsing.