2017-07-09 4 views
0

Ich benutze Kafka Streams und möchte einige Verbraucher Offset von Java an den Anfang zurückgesetzt. KafkaConsumer.seekToBeginning(...) klingt wie das Richtige zu tun, aber ich arbeite mit Kafka Streams:Reset Verbraucher-Offset an den Anfang von Kafka Streams

KafkaStreams streams = new KafkaStreams(builder, props); 
... 
streams.start(); 

Ich denke, dass auf dem Beton je Pipeline Ströme I definieren diese mehrere Verbraucher unter der Haube schaffen würde. Kann ich Zugang zu diesen bekommen? Oder gibt es eine andere Möglichkeit, Offsets programmgesteuert zurückzusetzen?

Antwort

1

Da Sie Kafka-Streams verwenden, sollten Sie nicht nur die Consumer-Offsets zurücksetzen, sondern auch den internen Statusspeicher von Streams.

Glücklicherweise gibt es ein Streams Application Reset Tool mit Kafka zur Verfügung gestellt.

Siehe https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

+1

Das ist, was ich bereits kenne, siehe oben ... Das Schlüsselwort hier ist programmgesteuert. – micgn

+0

Nun das Reset-Tool ist ein Programm und es ist Open-Source, so die Antwort ist ja, Sie können programmgesteuert sowohl Consumer Offsets und Streams State speichert. –

0

Aufbauend auf Hans Jespersens Antwort, ich diesen Code erfolgreich verwendet zu tun, was das Skript tut in Java-Code:

import kafka.tools.StreamsResetter; 

StreamsResetter resetter = new StreamsResetter(); 
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER}; 
resetter.run(args); 

Die Klasse Teil der kafkas Core-Bibliothek, die ich importiert in maven mit:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.12</artifactId> 
     <version>${kafka.version}</version> 
    </dependency> 
Verwandte Themen