In Kafka (0.11.0.1) Bächen, eine Demo-Anwendung Play with a Streams ApplicationWie funktioniert KafkaStream.KTable Schreibdaten zu kafka Thema in (verdichtet) KV Stil
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count("Counts")
// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
Und Schritt 5, einige Daten nach Prozess, konnten wir siehe kompaktierten KV Paare (zB strömt 2) in sink Thema -Strom wordcount-Ausgang,
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
Die Frage ist, wie funktioniert K-Tabelle wordCounts in den obigen Daten schreiben Daten zum Thema Streams-Wordcount-Ausgabe in Key-Value-Stil?
Die Option cleanup.policy Themen Streams-wordcount-Ausgabe scheint den Standardwert, delete
, nicht compact
(via ist/kafka-configs.sh)
In der Anleitung wird ** streams-wordcount-output ** in Schritt 3 erstellt, wobei 'cleanup.policy' der Standardwert' delete' ist. – Jacky
Sie haben Recht. Ich habe eine PR https://github.com/apache/kafka/pull/3949 gemacht –