2016-07-29 8 views

Antwort

0

für kafka 9:

  1. wenn Sie Gruppen-ID für Ihre Verbraucher gesetzt, kafka wird begangen Store (bearbeitet) Offsets für Sie. Dies funktioniert, wenn Sie neue Verbraucher in kafka verwenden read more
  2. , wenn Sie immer nach neuesten Offset lesen möchten, können Sie angeben, OffsetResetStrategy.LATEST
+1

OffsetResetStrategy wird ignoriert, wenn Sie bereits Offset gespeichert haben – serejja

0

Um Natalia Antwort zu vervollständigen Ich würde sagen, dass Sie wahrscheinlich nicht kümmern sich um das Speichern von Offsets, Sie möchten immer nur die neuesten Nachrichten konsumieren.

Um dieses Verhalten mit den meisten Consumer-Implementierungen (einschließlich "alten" Verbraucher in 0.8.x und "neuen" Verbraucher in 0.9.x und höher) erreichen Sie 2 Dinge tun müssen, werden:

  1. Set Gruppen-ID zu einem zufälligen Wert, auf diese Weise kann der Benutzer jedes Mal, wenn er startet, keine Offsets von überall wiederherstellen, und dies löst die Anforderung "Offset zurücksetzen" aus.
  2. Setzen Sie OffsetRequestStrategy (oder wie auch immer es in dem von Ihnen verwendeten Client heißt) zu latest, so dass, wenn Ihr Client nach verfügbarem Offset von Kafka fragt, den Offset für die letzte (neueste) Nachricht im Protokoll erhält.
+0

Wenn das OP nicht daran interessiert ist Offsets zu speichern, wäre es nicht besser 'KafkaConsumer.assign()' anstelle von 'subscribe()' zu verwenden und dann 'seekToEnd '() '? – Harald

+0

Vielleicht aber dann müsste er Partitionen bekommen, um sich selbst zuzuweisen – serejja

+0

@serejja Ja, ich habe versucht, Gruppen-ID auf neuen Namen und (auto.offset.reset = größte). Es funktionierte. Aber ich hatte einige bestehende Konsumenten und ich wollte die gleiche Gruppen-ID für alle von ihnen haben. Können wir das Problem nicht beheben, dass wir dieselbe Gruppen-ID haben? – Priyanka

1

Der einfachste Weg ist auto-commit (dh auto.commit.enable=false) zu deaktivieren und auto.offset.reset=latest (oder =largest für ältere Kafka-Versionen) in Ihrem Verbraucher Konfiguration zu verwenden.

Die Strömung in Kafka sich wie folgt:

  1. Start Verbraucher
  2. Verbraucher offse
    • für eine gültige verpflichtet sieht, wenn gefunden, setzt es die Verarbeitung von dort
    • wenn nicht gefunden, Starten Sie die Verarbeitung gemäß "auto.offset.reset"

Solange also ein gültiger Commit-Offset für Ihre Consumer-Gruppe vorliegt, hat "auto.offset.reset" keinerlei Auswirkungen. Daher sollten Sie auch nicht manuell festlegen.

Wenn bereits ein Offset festgeschrieben ist, müssen Sie ihn manuell löschen, bevor Sie den Consumer neu starten, wenn Sie vom aktuellen Offset lesen und keine und alte Daten verarbeiten möchten. (Oder verwenden Sie eine neue group.id, für die Sie wissen, dass es keinen festgeschriebenen Offset gibt.)

Als Alternative zu all dem können Sie auch "suchen zu beenden" jeder Partition in Ihrem Consumer. Dies macht Ihren Code jedoch komplexer und kann vermieden werden, wenn für Ihre Verbrauchergruppe überhaupt keine Festschreibung erfolgt.

+0

warum sollten wir nicht manuell festlegen? wann wird commit passieren, wenn wir es nicht manuell machen. Ich habe versucht, (auto.commit.enable = false) und uto.offset.reset = largest zu setzen und habe die gleiche Gruppen-ID wie zuvor, aber es liest immer noch von Anfang an. – Priyanka

+0

Meine Antwort wurde aktualisiert. Macht das Sinn? –

1

Für Kafka 0.10 (und möglicherweise auch früher) Sie können dies tun:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumer = new KafkaConsumer<>(properties); 
consumer.seekToEnd(Collections.emptySet()); 

Dies schaltet den Verbraucher auf den Brokern Offset-Speicher (da Sie sie nicht verwenden) und sucht auf die neueste Position aller Partitionen.

+1

Sie können hinzufügen, dass es notwendig ist, eine Verbrauchergruppe zu verwenden, die den Lese-Offset nicht bereits festgeschrieben hat. – Schleichardt

+0

True, es wird keinen vorhandenen gespeicherten Offset entfernen. Allerdings würde nicht mit Suche nur überschreiben dies? Ich erkannte, dass das OP nicht definierte, was sie mit "aktueller Offset" meint. Meine Antwort geht davon aus, dass sie das letzte Mal veröffentlicht werden möchte. Wenn sie "zuletzt verbraucht" meint, muss die automatische Festschreibung aktiviert werden und der Name der Kundengruppe muss bei jedem Lauf identisch sein. – AutomatedMike

Verwandte Themen