2017-01-17 2 views
3

Basierend auf apache Kafka docsKStream-to-KStream Joins are always windowed joins, meine Frage ist, wie kann ich die Größe des Fensters steuern? Ist diese Größe gleich groß, um Daten zu diesem Thema zu speichern? Oder wir können Daten zum Beispiel für 1 Monat behalten, aber nur für die vergangene Woche in den Stream einsteigen?Wie Kafka KStream zu Kstream-Fenster-Join zu verwalten?

Gibt es ein gutes Beispiel, um eine gefensterte kStream-zu-kStream Fensterverbindung anzuzeigen?

In meinem Fall sagen wir, ich habe 2 KStream, kstream1 und kstream2 Ich möchte 10 Tage von kstream1-30 Tagen kstream2 beitreten können.

Antwort

8

Das ist absolut möglich. Wenn Sie den Stream-Operator definieren, geben Sie explizit die Größe des Join-Fensters an.

KStream stream1 = ...; 
KStream stream2 = ...; 
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes 
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days 

stream1.leftJoin(stream2, 
       ... // add ValueJoiner 
       JoinWindows.of(joinWindowSizeMs) 
); 

// or if you want to use retention time 

stream1.leftJoin(stream2, 
       ... // add ValueJoiner 
       (JoinWindows)JoinWindows.of(joinWindowSizeMs) 
             .until(windowRetentionTimeMs) 
); 

Siehe http://docs.confluent.io/current/streams/developer-guide.html#joining-streams für weitere Details.

Das gleitende Fenster definiert grundsätzlich ein zusätzliches Join-Prädikat. In SQL-ähnlichen Syntax wäre dies so etwas wie:

SELECT * FROM stream1, stream2 
WHERE 
    stream1.key = stream2.key 
    AND 
    stream1.ts - before <= stream2.ts 
    AND 
    stream2.ts <= stream1.ts + after 

wo before == after == joinWindowSizeMs in diesem Beispiel. before und after können auch unterschiedliche Werte haben, wenn Sie JoinWindows#before() und JoinWindows#after() verwenden, um diese Werte explizit festzulegen.

Die Retentionszeit von Quellenthemen ist vollständig unabhängig von der angegebenen windowRetentionTimeMs, die auf ein Changelog-Thema angewendet wird, das von Kafka Streams selbst erstellt wurde. Die Fensteraufbewahrung ermöglicht die Verbindung von Out-of-Order-Datensätzen untereinander, dh, dass die Daten zu spät ankommen (beachten Sie, dass Kafka eine basierte Bestellgarantie hat, aber in Bezug auf Timestamps kann die Aufzeichnung out- von-Bestellung).

+0

Danke, ich werde es überprüfen und Ihre Antwort akzeptieren, wenn ich es ausführen konnte. und ich habe die meisten dieser Beispiele gelesen, die Sie erwähnt haben, aber ich konnte keine KStream Windowed Join – Am1rr3zA

+0

Auch finden. Wie kann ich unterschiedliche Fenstergröße angeben, da ich in meinem Fall 10 Tage Stream-1 mit 30 Tagen Stream-2 beitreten möchte – Am1rr3zA

+0

Sorry über die Beispiele. Scheint, es gibt nur KTable-Joins ... (dachte, es gibt auch einen KStream-K-Stream-Join). Sowieso. Über "10 Tage Stream-1 mit 30 Tagen Stream-2 verbinden": Dies ist mit Kafka Streams nicht möglich, da Kafka Streams nur Sliding-Window-Joins unterstützt - Sie benötigen einen Hopping-Window-Join. –

2

Zusätzlich zu dem, was Matthias J. Sax sagte, gibt es einen Strom-zu-Strom (mit Fenster) kommen beispielsweise bei: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

Dies ist für Confluent 3.1.x mit Apache Kafka 0.10.1, dh Die neuesten Versionen sind ab Januar 2017 verfügbar. Unter dem Zweig master im obigen Repository finden Sie Codebeispiele, die neuere Versionen verwenden.

Hier ist der Schlüsselteil des obigen Codebeispiels (wiederum für Kafka 0.10.1), leicht an Ihre Frage angepasst. Beachten Sie, dass in diesem Beispiel ein OUTER JOIN angezeigt wird.

long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5); 
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30); 

final Serde<String> stringSerde = Serdes.String(); 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic"); 
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic"); 

KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents, 
    (impressionValue, clickValue) -> impressionValue + "/" + clickValue, 
    // KStream-KStream joins are always windowed joins, hence we must provide a join window. 
    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs), 
    stringSerde, stringSerde, stringSerde); 

// Write the results to the output topic. 
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic"); 
Verwandte Themen