Ich benutze Kafka 0.11.0.0. Ich habe ein Testprogramm, das zu einem Kafka-Thema veröffentlicht; Wenn der Zoowärter und die Kafka-Server ausgefallen sind (was in meiner Entwicklungsumgebung normal ist; ich bringe sie nach Bedarf auf), dann bleibt der Anruf bei KafkaProducer <> .send() unbegrenzt lange hängen.Kafka Produzent sendet Blöcke auf unbestimmte Zeit, wenn Kafka Server heruntergefahren sind
Ich muss entweder send() zurückgeben, vorzugsweise den Fehler angeben; oder ich muss prüfen, ob die Server hoch oder runter sind. Grundsätzlich möchte ich, dass mein Testwerkzeug mir sagen kann: "Hey, Dummy, starte Kafka!" anstatt zu hängen.
Gibt es eine Möglichkeit für meine Producer-Aufgabe festzustellen, ob die Server hoch oder runter sind?
Ich rufe die send() wie folgt aus:
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
message), (rm, ex) -> {
System.out.println("**** " + rm + "\n**** " +ex);
});
Ich habe linger.ms = 1; Ich habe versucht, Wiederholungen = 0, 1 und 2, und send() blockiert immer noch. Ich habe den Rückruf nie genannt.
Ältere Nachrichten schlagen vor, metadata.fetch.timeout.ms auf einen kleinen Wert zu setzen, aber das ist in 0.11 verschwunden. Andere schlagen vor, Befehlszeilenprogramme aufzurufen, um zu sehen, ob die Server in Ordnung sind ... aber die referenzierten Dienstprogramme scheinen ebenfalls verschwunden zu sein.
Was ist die anmutige Weise, dies zu tun?
Das ist seltsam.Es sollte mit einem Fehler zurückkommen, der entweder "Aktualisierung der Metadaten fehlgeschlagen" oder "Abgelaufene x Anzahl der Datensätze" anzeigt. Überprüfen Sie die Einstellungen request.timeout.ms und max.block.ms für Ihren Producer. Standardmäßig ist request.timeout.ms 60 Sekunden lang – Shades88
request.timeout.ms ist 30000 und max.block.ms ist 60000. Ich werde versuchen, diese zu verringern. (Wenn interaktiv gearbeitet wird, können 30 Sekunden auch unbestimmt sein - mein Fehler.) –
OK, ja, das löst mein unmittelbares Problem; Danke! –