1

Ich habe diese Module (alle lokal) ausgeführt wird:Kann nicht an Kafka aus Python-Code erzeugen

  • ZooKeeper
  • Kafka Server
  • Kafka Consumer
  • Python-Skript

In der Skript gibt es eine send() Anruf:

producer = KafkaProducer(bootstrap_servers=['localhost:9092']) 
producer.send('test', 'entry1') 

alle 15 Sekunden oder so. Fast jeder send() Aufruf Renditen am Kafka Server diese Fehler:

[2017-10-16 18:59:10,953] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
kafka.common.KafkaException: Wrong request type 16 
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) 
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) 
at kafka.network.Processor.read(SocketServer.scala:450) 
at kafka.network.Processor.run(SocketServer.scala:340) 
at java.lang.Thread.run(Thread.java:748) 
[2017-10-16 18:59:11,158] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2017-10-16 18:59:11,162] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2017-10-16 18:59:11,162] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
kafka.common.KafkaException: Wrong request type 18 
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) 
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) 
at kafka.network.Processor.read(SocketServer.scala:450) 
at kafka.network.Processor.run(SocketServer.scala:340) 
at java.lang.Thread.run(Thread.java:748) 

Es ist fast ein, denn alle 5 Menuette (mehr oder weniger) der Eintrag für den Verbraucher Recht hat, aber dann wieder die Fehler.

Danke für die Hilfe

+0

Welchen Kafka-Client benutzen Sie und welche Version von Kafka? Hast du das hier probiert? https://github.com/conflutinc/confluent-kafka-python –

Antwort

0

Es ist wie Version von Kafka aussieht, ist nicht mit der Version von Python-Client kompatibel - bitten sie überprüfen. Die Liste der Codes für Operationen könnte here gefunden werden.

+0

Das könnte der Grund sein, der Spark mit Kafka auf Python hat alle Arten von Versionsbeschränkung. Zur Zeit habe ich mich entschieden, Kafka nicht zu benutzen. Wenn ich mich entscheide, sie erneut in meine Architektur einzugeben, werde ich diese Tipps verwenden und das Problem aktualisieren. –

Verwandte Themen