Ich möchte die Nachrichten für x keine Zeiten erneut veröffentlichen, die nicht verarbeitet werden konnten. Ich sehe den manuellen Commit-Code:So führen Sie eine fehlgeschlagene Nachricht in reaktiven Kafka erneut aus 0.8.2.2
val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffsetSink.publisher)
.map(processMessage(_)) // your message processing
.to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
.run()
Wie jedoch Ausnahmen in der Methode processMessage() behandeln? Ich möchte die Ausnahme behandeln und kafka bitten, die Nachricht dreimal zu wiederholen. Wenn es nach 3 Mal immer noch fehlschlägt, verwerfen Sie es.