2017-05-31 5 views
2

Ich habe einen Kafka-Stream - sagen für Blogs und eine Kafka-Tabelle - sagen für Kommentare zu diesen Blogs. Der Schlüssel aus dem kafka-Stream kann mehreren Werten in der Kafka-Tabelle zugeordnet werden, d. H. Ein Blog kann mehrere Kommentare enthalten. Ich möchte eine Verbindung von diesen beiden machen und ein neues Objekt mit einem Array von Kommentar-IDs erstellen. Aber wenn ich den Join mache, enthält der Stream nur die letzte Kommentar-ID. Gibt es eine Dokumentation oder einen Beispielcode, der mir eine richtige Richtung weisen kann, wie dies zu erreichen ist? Gibt es im Grunde eine Dokumentation, in der die Beziehung zwischen Kafka-Stream und Kafka-Tabelle erläutert wird?Kafka Stream und Kafka Tabelle Eins zu viele Beziehung Join

Also anstelle von Kommentar - ich brauche eine Reihe von Kommentar-IDs.

Antwort

3

ich nicht ein Join-Methode mit einer Signatur finden, dass in Ihrem Code Beispiel passend, aber hier ist das, was ich denke, ist das Problem:

KTables als changlog interpretiert werden, das heißt, jede weitere Nachricht mit Derselbe Schlüssel wird als Aktualisierung des Datensatzes und nicht als neuer Datensatz interpretiert. Aus diesem Grund sehen Sie nur die letzte "Kommentar" -Nachricht für einen bestimmten Schlüssel (Blog-ID), die vorherigen Werte werden überschrieben. Um dies zu umgehen, müssen Sie zunächst ändern, wie Sie Ihr KTable auffüllen. Sie können Ihr Kommentarthema als KStream zu Ihrer Topologie hinzufügen und dann eine Aggregation durchführen, die einfach ein Array oder eine Liste von Kommentaren mit derselben Blog-ID erstellt. Diese Aggregation gibt eine KTable zurück, mit der Sie Ihrem Blog KStream beitreten können.

Hier ist eine Skizze, wie Sie es tun können, eine Liste bewertet K-Tabelle zu erstellen:

builder.stream("yourCommentTopic") // where key is blog id 
.groupByKey() 
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)), 
    yourListSerde); 

Eine Liste einfacher ist, in einer Aggregation als ein Array zu verwenden, so dass ich schlage vor, Sie es in ein Array umwandeln Downstream wenn benötigt. Außerdem müssen Sie im obigen Beispiel eine serde-Implementierung für Ihre Liste "yourListSerde" bereitstellen.

+0

Zur besseren Lesbarkeit würde ich hinzufügen, dass der Schlüssel von "yourCommentTopic" die entsprechende Blogpost-ID wäre. Der 'groupByKey'-Schritt stellt dann sicher, dass der nachfolgende 'Aggregat'-Schritt Zugriff auf alle Kommentare für einen bestimmten Blogpost hat (und somit eine Liste aller Kommentare erstellen kann). –

+0

Danke! Überarbeitete die Antwort zu diesem Zweck –