Ich verwende Python (2.7) Multiprocessing, um Daten in Kafka-Warteschlange mit Kafka-Python (1.3.5) KafkaProducer zu pushen.Daten werden nicht mit Python-Multiprocessing an Kafka-Warteschlange gesendet
from kafka import KafkaProducer
import multiprocessing
# other imports
class TestClass(object):
def __init__(self, producer):
self.kafka_producer = producer
def main(self, conf, nthreads):
try:
for i in range(nthreads):
logger.info("Starting process number = %d " % (i + 1))
p = Process(target=self.do_some_task, args=(conf, 2))
p.start()
processes.append(p)
for p in processes:
logger.info("Joining process")
p.join()
except Exception, ex:
logger.error("Exception occurred : %s" % str(ex))
def do_some_task(conf, retry):
# some task happening
self.record(arg1, arg2)
# pushing to kafka
def record(self, arg1, arg2)
message = json.dumps({"a": "arg1", "b": "arg2"})
self.kafka_producer.send(KAFKA_TOPIC, message)
if __name__ == '__main__':
kafka_producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
request_timeout_ms=60000,
retries=2)
obj = TestClass(kafka_producer)
try:
parser = argparse.ArgumentParser(description='Description')
parser.add_argument('-threads', type=int, default=1) # 20 threads
parser.add_argument('-debug', type=int, default=0)
args = parser.parse_args()
me = SingleInstance(args.src)
TestClass.main(CONF[args.src], args.threads)
20 Fäden hervorgebracht, innerhalb dessen Kafka schreiben. Ich sah die Logs und fand heraus, dass der Prozess darauf wartet, dass die Nachricht in kafka geschrieben wird, aber irgendwann geht es weiter, ohne Kafka zu schreiben. Es gibt keine Ausnahme.
Ich habe versucht, den gleichen Code ohne Threads von Python-Befehlszeile ausgeführt und alles funktioniert wie erwartet. Was kann das Problem sein?