2016-11-10 4 views
4

den folgenden Code Gegeben:Warum sehe ich keine Ausgabe von Kafka Streams reduce?

KStream<String, Custom> stream = 
    builder.stream(Serdes.String(), customSerde, "test_in"); 

stream 
    .groupByKey(Serdes.String(), customSerde) 
    .reduce(new CustomReducer(), "reduction_state") 
    .print(Serdes.String(), customSerde); 

Ich habe eine println Anweisung innerhalb der Anwendung Methode des Reducer, die erfolgreich druckt, wenn ich die Reduktion stattfinden erwarten. Die oben gezeigte letzte Druckanweisung zeigt jedoch nichts an. Ebenso, wenn ich eine to Methode anstelle von print verwende, sehe ich keine Nachrichten im Zielthema.

Was brauche ich nach der reduce-Anweisung, um das Ergebnis der Reduktion zu sehen? Wenn ein Wert an den Eingang gesendet wird, erwarte ich nichts zu sehen. Wenn ein zweiter Wert mit dem gleichen Schlüssel gedrückt wird, erwarte ich, dass der Reduzierer angewendet wird (was er tut), und ich erwarte auch, dass das Ergebnis der Reduktion mit dem nächsten Schritt in der Verarbeitungspipeline fortfährt. Wie beschrieben, sehe ich nichts in den nachfolgenden Schritten der Pipeline und ich verstehe nicht warum.

+2

Versuchen zu einstellen 'StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG' 0. –

+0

@ MatthiasJ.Sax Dank zu schätzen ! Dies löste das Problem für mich, bitte zögern Sie nicht, es als Antwort zu posten, damit ich Ihnen die Punkte vergeben kann. Wenn Sie einen Link zu weiteren Informationen über dieses Konfigurationsdetail und ähnliches hinzufügen könnten, würde ich es auch sehr zu schätzen wissen. – LaserJesus

Antwort

7

Ab Kafka 0.10.1.0 verwenden alle Aggregationsoperatoren einen internen Deduplizierungscache, um die Last des Ergebnis-KTable-Changelogstreams zu reduzieren. Wenn Sie beispielsweise zwei Datensätze mit demselben Schlüssel direkt nacheinander zählen und verarbeiten, lautet der vollständige Changelog-Stream <key:1>, <key:2>.

Mit der neuen Caching-Funktion würde der Cache <key:1> empfangen und speichern, aber nicht sofort weiterleiten. Wenn <key:2> berechnet wird, ersetzt es den ersten Eintrag des Caches. Abhängig von der Cachegröße, der Anzahl der unterschiedlichen Schlüssel, des Durchsatzes und des Festschreibungsintervalls sendet der Cache Einträge nach dem System. Dies geschieht entweder bei der Cache-Räumung für einen einzelnen Schlüsseleintrag oder als eine vollständige Räumung des Caches (Senden aller Einträge stromabwärts). Das KTable Changelog zeigt also möglicherweise nur <key:2> (weil <key:1> wurde dupliziert).

Sie können die Größe des Caches über den Streams-Konfigurationsparameter StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG steuern. Wenn Sie den Wert auf Null setzen, deaktivieren Sie das Caching vollständig und das KTable-Changelog enthält alle Updates (wodurch ein Verhalten vor 0.10.1.0 bereitgestellt wird).

Confluent Dokumentation enthält einen Abschnitt, den Cache näher erläutern:

Verwandte Themen