2017-11-17 3 views
0

Ich benutze Kafka 0.11.0.0. Ich habe ein Testprogramm, das zu einem Kafka-Thema veröffentlicht; Wenn der Zoowärter und die Kafka-Server ausgefallen sind (was in meiner Entwicklungsumgebung normal ist; ich bringe sie nach Bedarf auf), dann bleibt der Anruf bei KafkaProducer <> .send() unbegrenzt lange hängen.Kafka Produzent sendet Blöcke auf unbestimmte Zeit, wenn Kafka Server heruntergefahren sind

Ich muss entweder send() zurückgeben, vorzugsweise den Fehler angeben; oder ich muss prüfen, ob die Server hoch oder runter sind. Grundsätzlich möchte ich, dass mein Testwerkzeug mir sagen kann: "Hey, Dummy, starte Kafka!" anstatt zu hängen.

Gibt es eine Möglichkeit für meine Producer-Aufgabe festzustellen, ob die Server hoch oder runter sind?

Ich rufe die send() wie folgt aus:

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, 
    message), (rm, ex) -> { 
     System.out.println("**** " + rm + "\n**** " +ex); 
}); 

Ich habe linger.ms = 1; Ich habe versucht, Wiederholungen = 0, 1 und 2, und send() blockiert immer noch. Ich habe den Rückruf nie genannt.

Ältere Nachrichten schlagen vor, metadata.fetch.timeout.ms auf einen kleinen Wert zu setzen, aber das ist in 0.11 verschwunden. Andere schlagen vor, Befehlszeilenprogramme aufzurufen, um zu sehen, ob die Server in Ordnung sind ... aber die referenzierten Dienstprogramme scheinen ebenfalls verschwunden zu sein.

Was ist die anmutige Weise, dies zu tun?

+0

Das ist seltsam.Es sollte mit einem Fehler zurückkommen, der entweder "Aktualisierung der Metadaten fehlgeschlagen" oder "Abgelaufene x Anzahl der Datensätze" anzeigt. Überprüfen Sie die Einstellungen request.timeout.ms und max.block.ms für Ihren Producer. Standardmäßig ist request.timeout.ms 60 Sekunden lang – Shades88

+0

request.timeout.ms ist 30000 und max.block.ms ist 60000. Ich werde versuchen, diese zu verringern. (Wenn interaktiv gearbeitet wird, können 30 Sekunden auch unbestimmt sein - mein Fehler.) –

+0

OK, ja, das löst mein unmittelbares Problem; Danke! –

Antwort

0

Wir können Nachrichten senden auf drei Arten vermitteln:

Feuer-und-vergessen: Wir senden eine Nachricht an den Server und kümmern sich nicht wirklich, wenn es erfolgreich ist oder nicht ankommt. Die meiste Zeit wird es erfolgreich ankommen, da Kafka hochverfügbar ist und der Produzent das Versenden von Nachrichten automatisch wiederholt. Mit dieser Methode gehen jedoch einige Nachrichten verloren.

Synchron senden Wir eine Nachricht senden, die send() Methode gibt ein Objekt Zukunft, und wir verwenden get() über die Zukunft zu warten und sehen, ob die send() erfolgreich war oder nicht.

Asynchronous senden Wir die send() Methode mit einer Callback-Funktion aufrufen, die ausgelöst wird, wenn er eine Antwort vom Broker Kafka empfängt.

Die einfachste Möglichkeit, eine Nachricht synchron zu senden ist wie folgt:

ProducerRecord<String, String> record = 
      new ProducerRecord<>(KAFKA_TOPIC, KEY, message); 
    try { 
      producer.send(record).get(); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    } 

Hier verwenden wir Future.get() auf eine Antwort von Kafka zu warten. Diese Methode löst eine Ausnahme aus, wenn der Datensatz nicht erfolgreich an Kafka gesendet wurde. Wenn keine Fehler aufgetreten sind, erhalten wir ein RecordMetadata-Objekt, mit dem wir den Offset abrufen können, in den die Nachricht geschrieben wurde.

hoffe das hilft.

1

Das ist seltsam. Es sollte mit einem Fehler zurückkommen, der entweder "Aktualisierung der Metadaten fehlgeschlagen" oder "Abgelaufene x Anzahl der Datensätze" anzeigt.

Überprüfen Sie request.timeout.ms und max.block.ms Einstellung für Ihren Hersteller. Standardmäßig request.timeout.ms ist 60 Sekunden lang

Verwandte Themen