lesen Ich verwende Java API Consumer Connector. Wann immer der Verbraucher anfängt, von einem Thema zu lesen, liest er von Anfang an ein Thema und es dauert eine ganze Weile, bis er das neueste Ereignis einholt. Wie können wir sicherstellen, dass der Verbraucher den aktuellen Offset liest?Java: Wie aus dem aktuellen Offset zu lesen, wenn ich von einem kafka Thema
Antwort
für kafka 9:
- wenn Sie Gruppen-ID für Ihre Verbraucher gesetzt, kafka wird begangen Store (bearbeitet) Offsets für Sie. Dies funktioniert, wenn Sie neue Verbraucher in kafka verwenden read more
- , wenn Sie immer nach neuesten Offset lesen möchten, können Sie angeben, OffsetResetStrategy.LATEST
Um Natalia Antwort zu vervollständigen Ich würde sagen, dass Sie wahrscheinlich nicht kümmern sich um das Speichern von Offsets, Sie möchten immer nur die neuesten Nachrichten konsumieren.
Um dieses Verhalten mit den meisten Consumer-Implementierungen (einschließlich "alten" Verbraucher in 0.8.x und "neuen" Verbraucher in 0.9.x und höher) erreichen Sie 2 Dinge tun müssen, werden:
- Set Gruppen-ID zu einem zufälligen Wert, auf diese Weise kann der Benutzer jedes Mal, wenn er startet, keine Offsets von überall wiederherstellen, und dies löst die Anforderung "Offset zurücksetzen" aus.
- Setzen Sie
OffsetRequestStrategy
(oder wie auch immer es in dem von Ihnen verwendeten Client heißt) zulatest
, so dass, wenn Ihr Client nach verfügbarem Offset von Kafka fragt, den Offset für die letzte (neueste) Nachricht im Protokoll erhält.
Wenn das OP nicht daran interessiert ist Offsets zu speichern, wäre es nicht besser 'KafkaConsumer.assign()' anstelle von 'subscribe()' zu verwenden und dann 'seekToEnd '() '? – Harald
Vielleicht aber dann müsste er Partitionen bekommen, um sich selbst zuzuweisen – serejja
@serejja Ja, ich habe versucht, Gruppen-ID auf neuen Namen und (auto.offset.reset = größte). Es funktionierte. Aber ich hatte einige bestehende Konsumenten und ich wollte die gleiche Gruppen-ID für alle von ihnen haben. Können wir das Problem nicht beheben, dass wir dieselbe Gruppen-ID haben? – Priyanka
Der einfachste Weg ist auto-commit (dh auto.commit.enable=false
) zu deaktivieren und auto.offset.reset=latest
(oder =largest
für ältere Kafka-Versionen) in Ihrem Verbraucher Konfiguration zu verwenden.
Die Strömung in Kafka sich wie folgt:
- Start Verbraucher
- Verbraucher offse
- für eine gültige verpflichtet sieht, wenn gefunden, setzt es die Verarbeitung von dort
- wenn nicht gefunden, Starten Sie die Verarbeitung gemäß "auto.offset.reset"
Solange also ein gültiger Commit-Offset für Ihre Consumer-Gruppe vorliegt, hat "auto.offset.reset" keinerlei Auswirkungen. Daher sollten Sie auch nicht manuell festlegen.
Wenn bereits ein Offset festgeschrieben ist, müssen Sie ihn manuell löschen, bevor Sie den Consumer neu starten, wenn Sie vom aktuellen Offset lesen und keine und alte Daten verarbeiten möchten. (Oder verwenden Sie eine neue group.id
, für die Sie wissen, dass es keinen festgeschriebenen Offset gibt.)
Als Alternative zu all dem können Sie auch "suchen zu beenden" jeder Partition in Ihrem Consumer. Dies macht Ihren Code jedoch komplexer und kann vermieden werden, wenn für Ihre Verbrauchergruppe überhaupt keine Festschreibung erfolgt.
warum sollten wir nicht manuell festlegen? wann wird commit passieren, wenn wir es nicht manuell machen. Ich habe versucht, (auto.commit.enable = false) und uto.offset.reset = largest zu setzen und habe die gleiche Gruppen-ID wie zuvor, aber es liest immer noch von Anfang an. – Priyanka
Meine Antwort wurde aktualisiert. Macht das Sinn? –
Für Kafka 0.10 (und möglicherweise auch früher) Sie können dies tun:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());
Dies schaltet den Verbraucher auf den Brokern Offset-Speicher (da Sie sie nicht verwenden) und sucht auf die neueste Position aller Partitionen.
Sie können hinzufügen, dass es notwendig ist, eine Verbrauchergruppe zu verwenden, die den Lese-Offset nicht bereits festgeschrieben hat. – Schleichardt
True, es wird keinen vorhandenen gespeicherten Offset entfernen. Allerdings würde nicht mit Suche nur überschreiben dies? Ich erkannte, dass das OP nicht definierte, was sie mit "aktueller Offset" meint. Meine Antwort geht davon aus, dass sie das letzte Mal veröffentlicht werden möchte. Wenn sie "zuletzt verbraucht" meint, muss die automatische Festschreibung aktiviert werden und der Name der Kundengruppe muss bei jedem Lauf identisch sein. – AutomatedMike
- 1. Tools zum Lesen von Verbraucher-Offset von Kafka 0.9
- 2. Kafka Thema vs Partition Thema
- 3. Kafka Java API Offset Operationen Klärung
- 4. Kafka Streams: Wie schreibe ich zu einem Thema?
- 5. Nachrichten von einem Kafka-Thema zu einem anderen Kafka-Thema replizieren
- 6. Kafka-Knoten Start Verbrauch von letzten Offset
- 7. Kafka Thema Partitionen zu Spark-Streaming
- 8. Nur Tasten von Kafka lesen
- 9. Wie wird man spätestens nach einem kafka Thema für eine Partition Offset?
- 10. flink Lesen von Daten aus kafka
- 11. Kafka Broker vs Thema
- 12. Kafka - Einfachste Möglichkeit, den neuesten Offset zu erhalten
- 13. Lesen von InputStream aus dem Java-Prozess
- 14. Thema erstellen Apache Kafka
- 15. Kafka Verbraucher Offset Max Wert?
- 16. Wie kann ich bei einem Java InputStream den aktuellen Offset im Stream ermitteln?
- 17. um die Existenz von Kafka Thema in Nodejs zu überprüfen
- 18. Wie neue entires aus einer Datei zu lesen, mit Kafka
- 19. Speicher von ".exe" + Offset lesen?
- 20. Kafka Consumer Group Offset Retention
- 21. Kafka-Thema für spezifischen Datensatz abfragen
- 22. mit Jmx Monitor Kafka Thema
- 23. Wie lade ich ein Kafka-Thema in HDFS?
- 24. Apache Kafka - KafkaStream auf Thema/partition
- 25. Wie sende ich mit kafka producer Nachrichten an dasselbe Thema?
- 26. Lesen von früher Offset Apache Storm
- 27. Wie erstelle ich Kafka ZKStringSerializer in Java?
- 28. Wie sollen wir die Kafka-Themen in einem bestimmten Zeitbereich lesen?
- 29. VB.Net txt-Datei aus dem aktuellen Verzeichnis lesen
- 30. wie Hintergrundfarbe von aktuellen Thema zu erhalten programmatisch
OffsetResetStrategy wird ignoriert, wenn Sie bereits Offset gespeichert haben – serejja