2017-09-21 1 views
0

In Kafka (0.11.0.1) Bächen, eine Demo-Anwendung Play with a Streams ApplicationWie funktioniert KafkaStream.KTable Schreibdaten zu kafka Thema in (verdichtet) KV Stil

// Serializers/deserializers (serde) for String and Long types 
final Serde<String> stringSerde = Serdes.String(); 
final Serde<Long> longSerde = Serdes.Long(); 

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values 
// represent lines of text (for the sake of this example, we ignore whatever may be stored 
// in the message keys). 
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input"); 

KTable<String, Long> wordCounts = textLines 
    // Split each text line, by whitespace, into words. 
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) 

    // Group the text words as message keys 
    .groupBy((key, value) -> value) 

    // Count the occurrences of each word (message key). 
    .count("Counts") 

    // Store the running counts as a changelog stream to the output topic. 
    wordCounts.to(stringSerde, longSerde, "streams-wordcount-output"); 

Und Schritt 5, einige Daten nach Prozess, konnten wir siehe kompaktierten KV Paare (zB strömt 2) in sink Thema -Strom wordcount-Ausgang,

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
    --topic streams-wordcount-output \ 
    --from-beginning \ 
    --formatter kafka.tools.DefaultMessageFormatter \ 
    --property print.key=true \ 
    --property print.value=true \ 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

all  1 
streams 1 
lead 1 
to  1 
kafka 1 
hello 1 
kafka 2 
streams 2 

Die Frage ist, wie funktioniert K-Tabelle wordCounts in den obigen Daten schreiben Daten zum Thema Streams-Wordcount-Ausgabe in Key-Value-Stil?

Die Option cleanup.policy Themen Streams-wordcount-Ausgabe scheint den Standardwert, delete, nicht compact (via ist/kafka-configs.sh)

Antwort

1

Alle Ein- und Ausgang Themen zu sein sind "out of scope" von Kafka Streams. Es liegt in der Verantwortung des Benutzers, diese Themen zu erstellen und zu konfigurieren.

So wird Ihr Thema "streams-wordcount-output" die Konfiguration haben, die Sie beim Erstellen des Themas angegeben haben.

vgl. https://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application

+1

In der Anleitung wird ** streams-wordcount-output ** in Schritt 3 erstellt, wobei 'cleanup.policy' der Standardwert' delete' ist. – Jacky

+0

Sie haben Recht. Ich habe eine PR https://github.com/apache/kafka/pull/3949 gemacht –