2015-04-07 4 views
14

Ich verbinde mich mit Kafka über die 0.8.2.1 kafka-clients-Bibliothek. Ich bin in der Lage, erfolgreich eine Verbindung herzustellen, wenn Kafka aktiv ist, aber ich möchte einen Fehler beheben, wenn Kafka nicht verfügbar ist. Hier ist meine Konfiguration:Wie kann ich einen Kafka-Ausfall elegant bewältigen?

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); 
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); 
producer = new KafkaProducer(kafkaProperties); 

Wenn Kafka nach unten ist, bekomme ich die folgenden Fehler in meinem Logs:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75] 
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75] 
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 

Dieser Fehler wiederholt in einer Endlosschleife und sperrt meine Java-Anwendung. Ich habe verschiedene Konfigurationseinstellungen in Bezug auf Timeouts, Wiederholungen und Bestätigungen ausprobiert, aber ich konnte nicht verhindern, dass diese Schleife auftritt.

Gibt es eine Konfigurationseinstellung, die dies verhindern kann? Muss ich eine andere Version des Clients ausprobieren? Wie kann ein Kafka-Ausfall elegant gehandhabt werden?

Antwort

20

Ich fand heraus, dass diese Kombination von Einstellungen der kafka-Client ermöglicht es schnell zum Scheitern verurteilt, ohne den Faden oder Spamming die Protokolle zu halten:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000"); 
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000"); 

Ich mag nicht, dass der kafka Client den Faden hält bei dem Versuch, die verbinden Kafka-Server, anstatt vollständig async, aber das ist zumindest funktional.

+0

Sehr hilfreich, danke. Ist das offiziell überall dokumentiert? – maxenglander

+0

@maxenglander: Nicht dass ich gesehen habe. Ich musste durch Code stochern und es gab viel Versuch und Irrtum. –

+0

"anstatt vollständig asynchron zu sein" – vbence

0

In dem 0.9-Client gibt es auch die max.block.ms-Eigenschaft, die die Zeit begrenzen wird, die der Client ausgeführt werden darf.

Verwandte Themen