2

Ich verwende Python (2.7) Multiprocessing, um Daten in Kafka-Warteschlange mit Kafka-Python (1.3.5) KafkaProducer zu pushen.Daten werden nicht mit Python-Multiprocessing an Kafka-Warteschlange gesendet

from kafka import KafkaProducer 
import multiprocessing 
# other imports 


class TestClass(object): 
    def __init__(self, producer): 
     self.kafka_producer = producer 

    def main(self, conf, nthreads): 
     try: 
      for i in range(nthreads): 
       logger.info("Starting process number = %d " % (i + 1)) 
       p = Process(target=self.do_some_task, args=(conf, 2)) 
       p.start() 
       processes.append(p) 
      for p in processes: 
       logger.info("Joining process") 
      p.join() 
     except Exception, ex: 
      logger.error("Exception occurred : %s" % str(ex)) 

    def do_some_task(conf, retry): 
     # some task happening 
     self.record(arg1, arg2) 

    # pushing to kafka 
    def record(self, arg1, arg2) 
     message = json.dumps({"a": "arg1", "b": "arg2"}) 
     self.kafka_producer.send(KAFKA_TOPIC, message) 


if __name__ == '__main__': 
    kafka_producer = KafkaProducer(
     bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, 
     request_timeout_ms=60000, 
     retries=2) 
    obj = TestClass(kafka_producer) 

    try: 
     parser = argparse.ArgumentParser(description='Description') 
     parser.add_argument('-threads', type=int, default=1) # 20 threads 
     parser.add_argument('-debug', type=int, default=0) 
     args = parser.parse_args() 
     me = SingleInstance(args.src) 
     TestClass.main(CONF[args.src], args.threads) 

20 Fäden hervorgebracht, innerhalb dessen Kafka schreiben. Ich sah die Logs und fand heraus, dass der Prozess darauf wartet, dass die Nachricht in kafka geschrieben wird, aber irgendwann geht es weiter, ohne Kafka zu schreiben. Es gibt keine Ausnahme.

Ich habe versucht, den gleichen Code ohne Threads von Python-Befehlszeile ausgeführt und alles funktioniert wie erwartet. Was kann das Problem sein?

Antwort

0

Bitte spawnen Sie die Verbindungen zu kafka nach den Forking-Prozessen. Und schließen Sie bitte die Verbindung und verbinden Sie sich erneut, wenn Sie auf verbindungsbezogene Fehler stoßen.

Verwandte Themen