geschickt erholen Wir erleben das folgende Szenario:Wie von Ausnahmen von producer.send() im Frühjahr Cloud-Stream-
- Wir haben einen Kafka-Cluster von 3 Knoten zusammengesetzt haben, jedes Thema erstellt hat 3 Partitionen
- eine Nachricht wird durch
MessageChannel.send()
gesendet, ein Rekord für die Herstellung von, sagen wir mal, Partition 1 - der Broker, die als die Partition Führer für diese Partition nicht
standardmäßig 0.123.gibt true
zurück und löst keine Ausnahme aus, selbst wenn der KafkaProducer schließlich die Nachricht nicht erfolgreich senden kann. Wir beobachten, etwa 30 Sekunden nach diesem Anruf, die folgende Nachricht in den Protokollen: Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time
In unserem Fall ist dies nicht akzeptabel, da wir sicher sein müssen, dass alle Nachrichten an Kafka, im Moment der Rückkehr des Anrufs zu MessageChannel.send()
.
Wir haben spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
zu true
eingeschaltet, die genau wie die Dokumentation beschreibt. Es blockiert den Anrufer für die Bestätigung des Herstellers über den Erfolg oder den Ausfall der Lieferung (MessageTimeoutException
, InterruptedException
, ExecutionException
), alles gesteuert von KafkaProducerMessageHandler
. Es scheint der beste Ansatz für uns zu sein, da die Leistungsauswirkungen in unserem Fall vernachlässigbar sind.
Aber müssen wir uns selbst um den Versuch kümmern, wenn eine Ausnahme ausgelöst wird? (In unserem Client-Code mit @Retryable
zum Beispiel)
Hier ist ein einfaches Projekt zu experimentieren: https://github.com/phdezann/spring-cloud-bus-kafka-helloworld