2017-08-10 2 views
1

Ich habe einen Kafka-Lesestream, den ich überprüfe, wenn ein bestimmter Schwellwert überschritten wird. Ich möchte nur eine Warnung verbreiten, wenn sie zum ersten Mal überschritten wird. Um dies zu erreichen, berechne ich zuerst den neuen Zustand, gruppiere die neuen Zustände in einem KGroupedStream. Reduziere dann auf eine KTable, wo ich überprüfe, ob sich der Status geändert hat (behalte einen booleschen Wert bei) und ändere den changelog-Stream und filtere die Datensätze, in denen sich der Status geändert hat.Kafka Stream API: KStream zu KGroupedStream zu KTable zu KStream

Meine Theorie war, dass dies funktionieren sollte, aber nicht jede Statusänderung wird in den Changelog-Stream propagiert, sondern nur gelegentlich der Changelog-Stream aktualisiert wird (kann ein Muster wirklich nicht sehen). Wer weiß warum, oder besser, wie ich dieses Problem lösen kann?

Vereinfachtes Beispiel:

KStream<String, String> inputStream = builder.stream("input"); 
KStream<String, String> outputStream = inputStream 
    .groupByKey() 
    .reduce((previousValue, newValue) -> newValue) 
    .toStream(); 
outputStream.to("output"); 

In diesem Fall ich, dass jeder neu eingehenden Wert auf den Ausgabestrom setzen würde erwarten würde. Dies ist jedoch nicht der Fall, nur ab und zu wird ein Wert in den Ausgabestrom geschrieben.

Antwort

2

Ich vermute, dass Sie die Cache-Pufferung aktiviert haben (es ist standardmäßig). Versuchen Sie, die folgenden Eigenschaften für Ihre Stream-Konfiguration zu konfigurieren.

props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 
+0

Ehrfürchtig, so offensichtlich man kennt es, danke! :) –

Verwandte Themen