2017-11-15 1 views
1

Ich habe Zeitserienstatistiken für eine Live-Anwendung. So alte Daten haben keine Bedeutung. Ich möchte nur die Daten verarbeiten, die nach dem Start der Stream-App empfangen wurden, und nicht den zuvor festgeschriebenen Offset. Was ist der richtige Weg, um alte Einträge in der kafka stream App nach dem Neustart zu ignorieren?Kafka Streams App - alte Nachrichten beim Neustart ignorieren

Mit kafka Verbraucher API verwendete ich im Allgemeinen die seekToEnd() Methode auf die neueste Aufzeichnung überspringen vorwärts. Gibt es einen äquivalenten Mechanismus für Streams? Ich möchte vermeiden, durch alle Nachrichten seit dem letzten Commit zu filtern, um alte Nachrichten zu ignorieren.

Antwort

0

Sie können einen anderen Kunden mit Kafka Consumer API mit groupId wie dem applicationId für Kafka-Streams erstellen und diesen Consumer zu einem seekToEnd() vor dem Starten Ihres Streams verwenden. Deaktivieren Sie autoCommit für diesen speziellen Verbraucher und übergeben Sie den Offset manuell nach seekToEnd(). Dann starte deinen Stream.

Sicherstellen, dass der Strom nicht begonnen hat, bis die Offsets von Reset Verbraucher verpflichtet sind.