2017-07-08 3 views
0

Mein Ziel ist es, Daten aus Nicht-Dateiquellen (d. H. Innerhalb eines Programms generiert oder über eine API gesendet) zu erhalten und an einen Sparkstream zu senden. Um dies zu erreichen, ich schicke die Daten durch eine python-basedKafkaProducer:Erhalten von Nachrichten von Python gesendet KafkaProducer

$ bin/zookeeper-server-start.sh config/zookeeper.properties & 
$ bin/kafka-server-start.sh config/server.properties & 
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic 
$ python 
Python 3.6.1| Anaconda custom (64-bit) 
> from kafka import KafkaProducer 
> import time 
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) 
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time()) 
> producer.close() 
> exit() 

Mein Problem ist, dass nichts angezeigt wird, wenn das Thema aus dem Consumer-Shell-Skript Überprüfung:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic 
^C$ 

Ist etwas fehlt oder falsch Hier? Ich bin neu bei Spark/Kafka/Messaging-Systemen, also wird alles helfen. Die Kafka-Version ist 0.11.0.0 (Scala 2.11) und es werden keine Änderungen an den Konfigurationsdateien vorgenommen.

Antwort

0

Ich fand das Problem, das value_serializer stillschweigend bricht, weil ich das JSON-Modul nicht in den Interpreter importiert habe. Zwei Lösungen dafür, man importiert einfach das Modul und Sie erhalten "MESSAGE ACKNOWLEDGED" (mit Anführungszeichen) zurück. Oder Sie können value_serializer insgesamt entfernen und die value Zeichenfolge, die in der nächsten Zeile gesendet wird, in eine Bytezeichenfolge (d. H. b'MESSAGE ACKNOWLEDGED' für Python 3) konvertieren, sodass Sie die Nachricht ohne Anführungszeichen zurück erhalten.

Ich wechselte auch Kafka auf Version 0.10.2.1 (Scala 2.11), da in den Kafka-Python-Dokumenten keine Bestätigung gibt es, die mit der Version 0.11.0.0

1

Wenn Sie einen Verbraucher nach dem Senden von Nachrichten an ein Thema starten, ist es möglich, dass der Consumer diese Nachrichten überspringt, weil er einen Themenoffset (der als "Ausgangspunkt" zum Lesen angesehen werden kann) auf den Thema endet. Um dieses Verhalten zu ändern versuchen --from-beginning Option hinzuzufügen:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning 

Sie können auch kafkacat versuchen, die bequemer ist als Kafkas Konsole Konsument und Produzent (imho). Lesen von Nachrichten von Kafka mit kafkacat kann mit dem folgenden Befehl durchgeführt werden:

kafkacat -C -b 'localhost:9092' -o beginning -e -D '\n' -t 'my-topic' 

Hoffe, es hilft.

+0

ich, es ist kompatibel sagt hinzugefügt 'von-beginning' aber das Ergebnis war das Gleiche. Ich habe auch kafkacat installiert, meine Schritte neu gemacht und den Befehl ausgeführt, aber die Nachricht wurde immer noch nicht gefunden. – user2361174

+1

@ user2361174 nur Ihr Beispiel überprüft, und es scheint, als würde der Produzent nichts senden wegen 'timestamp_ms = time.time()' - wenn Debug-Logging einschalten, wird die folgende Meldung im Log erscheinen: 'DEBUG: kafka. producer.kafka: Während der Nachricht wurde eine Ausnahme gesendet: '. Wahrscheinlich gibt 'time.time()' den Zeitstempel in dem Format zurück, das für den Produzenten unerwartet ist ... Das Entfernen dieser Option sollte also den Trick machen, das heißt 'producer.send (topic = 'mein-Thema', value = 'MELDUNG BESTÄTIGT ') '(der aktuelle Zeitstempel wird standardmäßig verwendet). – xscratt

+0

Ich habe den Zeitstempel los, obwohl im Consumer oder im Kafkacat-Kommando immer noch nichts zu sehen ist. Ich habe die Befehlszeilenausgabe hier gespeichert: https://raw.githubusercontent.com/dretta/spark/master/kafka.log – user2361174

Verwandte Themen