2017-12-15 8 views
2

Ich möchte die Nachrichten für x keine Zeiten erneut veröffentlichen, die nicht verarbeitet werden konnten. Ich sehe den manuellen Commit-Code:So führen Sie eine fehlgeschlagene Nachricht in reaktiven Kafka erneut aus 0.8.2.2

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties) 
Source.fromPublisher(consumerWithOffsetSink.publisher) 
.map(processMessage(_)) // your message processing 
.to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit 
.run() 

Wie jedoch Ausnahmen in der Methode processMessage() behandeln? Ich möchte die Ausnahme behandeln und kafka bitten, die Nachricht dreimal zu wiederholen. Wenn es nach 3 Mal immer noch fehlschlägt, verwerfen Sie es.

Antwort

0

Erwägen Sie, eine Warteschlange für nicht zustellbare Nachrichten zu definieren (andere Speicher wie DB ist auch in Ordnung), um diese Nachrichten zu speichern, und dann könnten wir versuchen, sie zu analysieren und zu lösen.

0

Ich glaube, Sie sind in der Lage, zumindest einmal zu lesen, aber Sie wollen nicht dauerhaft im Falle von Fehlern stoppen und möchten lieber den Prozess mit dem Rest der Nachrichten nach 3 Wiederholungen der fehlgeschlagene Nachricht

Dies könnte leicht mit einfachen Verbraucher in Apache Kafka erreicht werden. Lassen Sie mich wissen, ob das eine Option ist und ich würde das erklären.

Im Fall von reaktiven Kafka, nach drei Wiederholungen in processMessage, müssen wir einen Weg suchen, um zurückzugeben, was eine 'verarbeitete' Nachricht zurückgibt. Auf diese Weise würde das (Offset der fehlgeschlagenen Nachricht) beim nächsten Offset-Commit in die Senke (als ob es verarbeitet wurde) eingeschlossen sein und würde nicht wieder auftauchen.

Verwandte Themen