2017-12-15 4 views
3

Ich versuche derzeit, einfach Nachrichten von einem Thema auf einem Kafka-Cluster zu einem anderen zu streamen (Remote -> Lokaler Cluster).
Die Idee ist, sofort Kafka-Streams zu verwenden, damit wir nicht die tatsächlichen Nachrichten auf dem lokalen Cluster replizieren müssen, sondern nur die "Ergebnisse" der Verarbeitung von Kafka-Streams in unsere Kafka-Themen übernehmen.Streamen von Nachrichten von einem Kafka-Cluster zu einem anderen

Nehmen wir an, die WordCount-Demo ist auf einer Kafka-Instanz auf einem anderen PC als meinem. Ich habe auch eine Kafka-Instanz, die auf meinem lokalen Rechner läuft.
Jetzt möchte ich die WordCount Demo auf dem Thema ("remote") laufen lassen, die die Sätze enthält, welche Wörter gezählt werden sollen.
Die Zählung sollte jedoch in ein Thema in meinem lokalen System anstatt in ein "Remote" -Thema geschrieben werden.

Ist so etwas mit der Kafka-Streams-API machbar?
Zum Beispiel

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig) 
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig) 
val wordCounts: KTable[String, Long] = textLines 
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) 
    .groupBy((_, word) => word) 
    .count("word-counts") 

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig) 

val streams: KafkaStreams = new KafkaStreams(builder) 
streams.start() 

Vielen Dank
- Tim

Antwort

4

Kafka Streams ist nur für einzelne Cluster aufzubauen.

Eine Problemumgehung ist die Verwendung eines foreach() oder ähnlichem und Instanziieren Sie Ihre eigenen KafkaProducer, die in den Zielcluster schreiben. Beachten Sie, dass Ihr eigener Producer sync schreiben muss! Andernfalls könnten Sie im Falle eines Fehlers Daten verlieren. Daher ist es keine sehr performante Lösung.

Es ist besser, das Ergebnis nur in den Quellcluster zu schreiben und die Daten in den Zielcluster zu replizieren. Beachten Sie, dass Sie sehr wahrscheinlich eine kürzere Aufbewahrungsdauer des Ausgabethemas im Quellcluster verwenden können, da die tatsächlichen Daten ohnehin im Zielcluster mit einer längeren Aufbewahrungszeit gespeichert werden. Dadurch können Sie den erforderlichen Speicherplatz im Quellcluster begrenzen.

+0

Vielen Dank Matthias, gut zu wissen! –

+0

Wie kann man die Nachrichten am besten synchron abrufen? Nur die '.get()' Methode für jede Nachricht aufrufen scheint ein bisschen "hacky". Gibt es dafür eine Eigenschaft? –

+1

Die Verwendung von 'get()' ist korrekt. –

3

Check out Replicator, die Matthias in seiner Antwort oben anspielt. Das passt zu dem, was du schön beschreibst.

+0

Perfekt! Vielen Dank –

Verwandte Themen