2017-12-22 3 views
1

geschickt erholen Wir erleben das folgende Szenario:Wie von Ausnahmen von producer.send() im Frühjahr Cloud-Stream-

  • Wir haben einen Kafka-Cluster von 3 Knoten zusammengesetzt haben, jedes Thema erstellt hat 3 Partitionen
  • eine Nachricht wird durch MessageChannel.send() gesendet, ein Rekord für die Herstellung von, sagen wir mal, Partition 1
  • der Broker, die als die Partition Führer für diese Partition nicht

standardmäßig 0.123.gibt true zurück und löst keine Ausnahme aus, selbst wenn der KafkaProducer schließlich die Nachricht nicht erfolgreich senden kann. Wir beobachten, etwa 30 Sekunden nach diesem Anruf, die folgende Nachricht in den Protokollen: Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

In unserem Fall ist dies nicht akzeptabel, da wir sicher sein müssen, dass alle Nachrichten an Kafka, im Moment der Rückkehr des Anrufs zu MessageChannel.send().

Wir haben spring.cloud.stream.kafka.bindings.<channelName>.producer.sync zu true eingeschaltet, die genau wie die Dokumentation beschreibt. Es blockiert den Anrufer für die Bestätigung des Herstellers über den Erfolg oder den Ausfall der Lieferung (MessageTimeoutException, InterruptedException, ExecutionException), alles gesteuert von KafkaProducerMessageHandler. Es scheint der beste Ansatz für uns zu sein, da die Leistungsauswirkungen in unserem Fall vernachlässigbar sind.

Aber müssen wir uns selbst um den Versuch kümmern, wenn eine Ausnahme ausgelöst wird? (In unserem Client-Code mit @Retryable zum Beispiel)

Hier ist ein einfaches Projekt zu experimentieren: https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

Antwort

0

Wenn die send() auf dem @StreamListener Thread ausgeführt wird, und die Ausnahme ausgelöst wird zurück auf das Bindemittel, das Bindemittel Wiederholungs Konfiguration wird Wiederholungen durchführen.

Da Sie jedoch das Senden in einem HTTP-Thread ausführen, müssen Sie einen erneuten Versuch durchführen (Aufruf im Rahmen einer RetryTemplate() senden) oder die Controller-Methode @Retryable.

Verwandte Themen