2

Ich bin neu bei Kafka und versuche, eine Producer - Consumer App mit Kafka zu bauen. Hier kann ich Nachrichten an kalka ausgeben, aber wenn ich versuche, sie mit einem Verbraucher wieder zu verbrauchen, gibt es 0 Datensätze zurück.Kafka kann nicht konsumieren, ohne von Anfang zu lesen -Java

Ich überprüfe Offset für meine Verbrauchergruppe, ich kann sehen, dass Offset ist gleich Log-Länge sind die gleichen (1M in meinem Fall - das gleiche wie die Anzahl der Datensätze).

Wenn ich diese config -Eigenschaft beim Erstellen meines Verbrauchers verwenden, lesen Sie von Anfang an.

configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Aber meine Forderung ist, wenn ich Verbraucher neu zu starten, sollte es aus dem vorherigen Endpunkt wie AMQ starten.

Gibt es etwas, das ich hier vermisse? Ich denke, der Offset sollte sich erst nach einer Verbraucherumfrage ändern. Warum ist es am Anfang selbst auf die maximale Länge eingestellt?

+0

http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups-and-offset-management/19390/how-can-i-read-topic-from-its-beginning#t=201703281614275445343 –

+0

@ MatthiasJ.Sax Vielen Dank für Ihre Antwort. Aber nach dem Link, den Sie geteilt haben, verstehe ich, dass der Verbraucher den Offset nicht ändert, den ich von dem Thema abrufe und konsumiere. Aber mein Fall-Offset wird eingestellt, um zu enden, bevor ich irgendwelche Aufzeichnungen vom Thema konsumiere. Mache ich etwas falsch? –

+0

Das stimmt. "Suchen" zu einer Position wird immer faul gemacht. Ich habe unten eine ausführlichere Antwort gegeben. Lass es mich wissen, wenn es hilft. Kafka Consumer ist ein wenig schwierig zu verstehen, wenn Sie gerade erst anfangen - es ist anders (und besser) als das, was Sie mit anderen Systemen tun können. Aber so, (am Anfang) auch ein wenig schwieriger, richtig zu bekommen :) –

Antwort

4

Als Bindeglied beschreibt es gibt einige Szenarien, die Sie berücksichtigen müssen:

  1. einen neuen Verbraucher Start (neu group.id): Für diesen Fall wird es keinen Offset begangen und damit der Verbraucher Start entsprechend der Parametereinstellung lesen auto.offset.reset

  2. Neustart eines Verbrauchers (Wiederverwendung von group.id): In diesem Fall wird der Verbraucher dort fortfahren, wo er aufgehört hat. Die Parametereinstellung auto.offset.reset wird ignoriert.

So, für Szenario (1) können Sie nur Ihre Startposition "konfigurieren". Für Szenario (2) ist Ihre Startposition "fest" (dh immer letzter festgeschriebener Offset) und dies kann nicht über eine Konfiguration geändert werden. Sie können jedoch immer eine .seekToBeginning() oder .seekToEnd() vor dem ersten Aufruf an poll() tun und entweder das gesamte Thema lesen oder am Ende des Themas beginnen. Ein Aufruf an .seekXX() "überschreibt" den letzten festgeschriebenen Offset und ermöglicht es Ihnen, mit jedem Offset zu beginnen, den Sie mögen. Beachten Sie, dass es auch seek() gibt, die "Offset-Parameter" übernehmen, so dass Sie jeden Offset angeben können, von dem aus Sie beginnen möchten.

+1

Ich verstehe, wie Offset funktioniert jetzt aus Ihrer Antwort und den Link, den Sie geteilt haben. Aber was ich nicht bekomme ist, dass ich 1M zu einem Thema produziere, indem ich einen Produzenten verwende. Ich habe einen Consumer mit eindeutiger Gruppen-ID 'configProperties.put (ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID(). ToString());' erstellt. Dies ist immer noch verbraucht alle Daten aus dem Thema ohne AUTO_OFFSET_RESET_CONFIG auf frühestens gesetzt. –

+0

Im Normalfall, wenn ich einen neuen Verbraucher mit neuer Gruppen-ID starte, sollte es von Anfang an beginnen, ohne dass ich den Offset nach rechts korrigieren muss? –

+1

Wenn Sie 'auto.offset.reset' nicht setzen und einen neuen Benutzer (dh neue Gruppen-ID) verwenden, sollte es vom Ende her gelesen werden ...Nicht sicher, warum es das ganze Thema liest ... (Ich nehme an, Sie haben keinen laufenden Produzenten, wenn Sie Ihren Verbraucher beginnen). –

Verwandte Themen