2016-06-09 11 views
6

Wie erhalte ich den Offset eines Kafka Connect-Anschlusses/Task/Senke?Kafka Connect Offsets. Holen/Setzen?

Ich kann das /usr/bin/kafka-consumer-groups Tool verwenden, das kafka.admin.ConsumerGroupCommand läuft, um die Offsets für alle meine regulären Kafka Verbrauchergruppen zu sehen. Kafka Connect-Aufgaben und Gruppen werden jedoch mit diesem Tool nicht angezeigt.

Ähnlich kann ich die zoekeeper-Shell verwenden, um eine Verbindung zum Zookeeper herzustellen, und ich kann Zookeeper-Einträge für normale Kafka-Benutzergruppen sehen, aber nicht für Kafka Connect-Senken.

+0

Als (sehr schlecht) Vermeidung des Problems können Sie den Connector löschen und einen neuen Stecker unter einem anderen Namen registrieren. Das macht natürlich nur Sinn, wenn Sie das nicht regelmäßig machen müssen. – pederpansen

+0

[This] (https://stackoverflow.com/questions/45670937/kafka-0-11-how-to-reset-offsets) ist eine nette Erklärung, wie man die Offsets für eine Gruppe ändert. –

Antwort

7

Ab 0.10.0.0 bietet Connect keine API zum Verwalten von Offsets. Das wollen wir in Zukunft verbessern, aber noch nicht. Das ConsumerGroupCommand wäre das richtige Werkzeug, um Offsets für Sink-Anschlüsse zu verwalten. Beachten Sie, dass Source-Connector-Offsets in einem speziellen Offsets-Thema für Connect gespeichert werden (sie sind nicht wie normale Kafka-Offsets, da sie vom Quellsystem definiert sind, siehe offset.storage.topic in worker configuration docs) und da Sink Connectors den neuen Consumer verwenden, haben sie gewonnen. t ihre Offsets in Zookeeper speichern - alle modernen Clients verwenden nativen Kafka-basierten Offset-Speicher. Die ConsumerGroupCommand kann mit diesen Offsets arbeiten, Sie müssen nur die --new-consumer Option übergeben).

+1

Irgendwelche ETA oder Pläne für diese bitte? Kann https://issues.apache.org/jira/browse/KAFKA-4107 auch ohne Fortschritt oder Details sehen. – SemanticBeeng

2

Sie können keine Offsets festlegen, aber Sie können das Werkzeug kafka-consumer-groups.sh verwenden, um den Feed Forward "zu scrollen".

Die Verbrauchergruppe Ihres Stecker hat einen Namen von connect-*CONNECTOR NAME*, aber Sie können überprüfen, double: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

Strom zu sehen, Offset: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

der Vorwärts-Offset zu verschieben: unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

Ich nehme an, dass Sie den Offset auch rückwärts verschieben können, indem Sie zuerst die Verbrauchergruppe löschen, indem Sie --delete Flag verwenden.

Vergessen Sie nicht, Ihren Connector über die Kafka Connect-REST-API anzuhalten und fortzusetzen.

0

In meinem Fall (Testdateien in Hersteller Lesen und verbrauchen in der Konsole, die alle in nur lokal), ich sah nur diese Ausgabe in producer:

offset.storage.file.filename=/tmp/connect.offsets 

Also wollte ich es öffnen, aber es ist binär, mit einigen kaum erkennbaren Charakteren.

Ich löschte es (umbenennen es auch funktioniert), und dann kann ich in die gleiche Datei schreiben und den Dateiinhalt vom Verbraucher wieder erhalten. Sie müssen den Konsolenhersteller neu starten, damit er wirksam wird, weil er versucht, die Offset-Datei zu lesen. Wenn nicht, erstellen Sie eine neue, damit der Offset zurückgesetzt wird.

Wenn Sie es ohne Löschen zurücksetzen möchten, können Sie verwenden:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name> 

Sie alle Gruppennamen überprüfen können:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

und prüfen Details der einzelnen Gruppen:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe 

In der Produktionsumgebung wird dieser Offset von zookeeper verwaltet, also mehr Schritte (und c aution) wird benötigt. Sie können auf diese Seite verweisen:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

Schritte:

kafka-topics --list --zookeeper localhost:2181 
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest 

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset} 
Verwandte Themen