Das ist absolut möglich. Wenn Sie den Stream-Operator definieren, geben Sie explizit die Größe des Join-Fensters an.
KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
stream1.leftJoin(stream2,
... // add ValueJoiner
JoinWindows.of(joinWindowSizeMs)
);
// or if you want to use retention time
stream1.leftJoin(stream2,
... // add ValueJoiner
(JoinWindows)JoinWindows.of(joinWindowSizeMs)
.until(windowRetentionTimeMs)
);
Siehe http://docs.confluent.io/current/streams/developer-guide.html#joining-streams für weitere Details.
Das gleitende Fenster definiert grundsätzlich ein zusätzliches Join-Prädikat. In SQL-ähnlichen Syntax wäre dies so etwas wie:
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts
AND
stream2.ts <= stream1.ts + after
wo before == after == joinWindowSizeMs
in diesem Beispiel. before
und after
können auch unterschiedliche Werte haben, wenn Sie JoinWindows#before()
und JoinWindows#after()
verwenden, um diese Werte explizit festzulegen.
Die Retentionszeit von Quellenthemen ist vollständig unabhängig von der angegebenen windowRetentionTimeMs
, die auf ein Changelog-Thema angewendet wird, das von Kafka Streams selbst erstellt wurde. Die Fensteraufbewahrung ermöglicht die Verbindung von Out-of-Order-Datensätzen untereinander, dh, dass die Daten zu spät ankommen (beachten Sie, dass Kafka eine basierte Bestellgarantie hat, aber in Bezug auf Timestamps kann die Aufzeichnung out- von-Bestellung).
Danke, ich werde es überprüfen und Ihre Antwort akzeptieren, wenn ich es ausführen konnte. und ich habe die meisten dieser Beispiele gelesen, die Sie erwähnt haben, aber ich konnte keine KStream Windowed Join – Am1rr3zA
Auch finden. Wie kann ich unterschiedliche Fenstergröße angeben, da ich in meinem Fall 10 Tage Stream-1 mit 30 Tagen Stream-2 beitreten möchte – Am1rr3zA
Sorry über die Beispiele. Scheint, es gibt nur KTable-Joins ... (dachte, es gibt auch einen KStream-K-Stream-Join). Sowieso. Über "10 Tage Stream-1 mit 30 Tagen Stream-2 verbinden": Dies ist mit Kafka Streams nicht möglich, da Kafka Streams nur Sliding-Window-Joins unterstützt - Sie benötigen einen Hopping-Window-Join. –