Mein Ziel ist es, Daten aus Nicht-Dateiquellen (d. H. Innerhalb eines Programms generiert oder über eine API gesendet) zu erhalten und an einen Sparkstream zu senden. Um dies zu erreichen, ich schicke die Daten durch eine python-basedKafkaProducer
:Erhalten von Nachrichten von Python gesendet KafkaProducer
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()
Mein Problem ist, dass nichts angezeigt wird, wenn das Thema aus dem Consumer-Shell-Skript Überprüfung:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic
^C$
Ist etwas fehlt oder falsch Hier? Ich bin neu bei Spark/Kafka/Messaging-Systemen, also wird alles helfen. Die Kafka-Version ist 0.11.0.0 (Scala 2.11) und es werden keine Änderungen an den Konfigurationsdateien vorgenommen.
ich, es ist kompatibel sagt hinzugefügt 'von-beginning' aber das Ergebnis war das Gleiche. Ich habe auch kafkacat installiert, meine Schritte neu gemacht und den Befehl ausgeführt, aber die Nachricht wurde immer noch nicht gefunden. – user2361174
@ user2361174 nur Ihr Beispiel überprüft, und es scheint, als würde der Produzent nichts senden wegen 'timestamp_ms = time.time()' - wenn Debug-Logging einschalten, wird die folgende Meldung im Log erscheinen: 'DEBUG: kafka. producer.kafka: Während der Nachricht wurde eine Ausnahme gesendet: '. Wahrscheinlich gibt 'time.time()' den Zeitstempel in dem Format zurück, das für den Produzenten unerwartet ist ... Das Entfernen dieser Option sollte also den Trick machen, das heißt 'producer.send (topic = 'mein-Thema', value = 'MELDUNG BESTÄTIGT ') '(der aktuelle Zeitstempel wird standardmäßig verwendet). –
xscratt
Ich habe den Zeitstempel los, obwohl im Consumer oder im Kafkacat-Kommando immer noch nichts zu sehen ist. Ich habe die Befehlszeilenausgabe hier gespeichert: https://raw.githubusercontent.com/dretta/spark/master/kafka.log – user2361174