2017-06-23 4 views
2

Ich benutze Kafka Stream und ich versuche, eine KTable in ein Thema zu materialisieren.Apache Kafka Streams Materializing KTables zu einem Thema scheint langsam

Es funktioniert, aber es scheint alle 30 Sekunden oder so getan werden.

Wie/Wann entscheidet sich Kafka Stream, den aktuellen Zustand einer KTable in ein Thema zu materialisieren?

Gibt es eine Möglichkeit, diese Zeit zu verkürzen und sie "in Echtzeit" zu machen?

Hier ist der eigentliche Code I

bin mit
// Stream of random ints: (1,1) -> (6,6) -> (3,3) 
// one record every 500ms 
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC); 

// grouping by key 
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer()); 

// same behaviour with or without the TimeWindow 
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total"); 

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
+0

Könnten Sie mehr klar sein, auf das, was Sie erreichen wollen? Irgendein Code? Meinst du, du machst so etwas wie: ktable.to ("topic_name")? –

+0

ja das ist genau das, was ich tue –

+1

Ok, ich verstehe dein Problem, aber leider habe ich diesen speziellen Fall noch nie abgestimmt (ich werde tatsächlich etwas Ähnliches brauchen, sobald wir eine Lösung implementieren, die dies erfordert, um sehr zu sein schnell/aktuell) mit dem gesagt, ich würde beginnen mit den verfügbaren Konfigurationen zu spielen: http://kafka.apache.org/documentation/#configuration Möglicherweise zuerst mit dem folgenden: Broker Level: log.flush. * Einsen, Offsets .commit.timeout.ms, Producer Timeouts. Themenebene: Flush. * Einsen Stream-Ebene: commit. * Einsen Wenn Sie eine Lösung finden Sie hier, es wird nützlich sein –

Antwort

2

Dies wird durch commit.interval.ms gesteuert wird, die standardmäßig auf 30 Sekunden. Weitere Details hier: http://docs.confluent.io/current/streams/developer-guide.html

Die Semantik-Caching ist, dass Daten an die Zustandsspeicher gespült werden und mit dem nächsten Downstream-Prozessorknoten weitergeleitet, wenn die frühesten commit.interval.ms oder cache.max.bytes.buffering (Cache-Druck) trifft.

und hier:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

+0

Super, danke Michal –

Verwandte Themen