2012-12-20 5 views
6

Ich benutze das Multiprocessing-Modul, um eine sehr große Aufgabe aufzuteilen. Es funktioniert größtenteils, aber ich muss etwas Offensichtliches mit meinem Design vermissen, denn auf diese Weise ist es sehr schwer für mich, effektiv zu sagen, wann alle Daten verarbeitet wurden.Multiprocessing - Producer/Consumer Design

Ich habe zwei separate Aufgaben, die ausgeführt werden; einer, der den anderen füttert. Ich denke, das ist ein Produzenten-/Konsumentenproblem. Ich verwende eine geteilte Warteschlange zwischen allen Prozessen, wobei die Produzenten die Warteschlange füllen und die Konsumenten aus der Warteschlange lesen und die Verarbeitung durchführen. Das Problem ist, dass es eine endliche Datenmenge gibt. Irgendwann muss jeder wissen, dass alle Daten verarbeitet wurden, damit das System ordnungsgemäß heruntergefahren werden kann.

Es scheint sinnvoll zu sein, die map_async() -Funktion zu verwenden, aber da die Produzenten die Warteschlange füllen, kenne ich nicht alle Punkte im Voraus, also muss ich in eine while-Schleife gehen und benutze apply_async() und versuche zu erkennen, wann alles mit einer Art Timeout fertig ist ... hässlich.

Ich fühle mich, als ob ich etwas Offensichtliches vermisse. Wie kann das besser gestaltet werden?

PRODCUER

class ProducerProcess(multiprocessing.Process): 
    def __init__(self, item, consumer_queue): 
     self.item = item 
     self.consumer_queue = consumer_queue 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     for record in get_records_for_item(self.item): # this takes time 
      self.consumer_queue.put(record) 

def start_producer_processes(producer_queue, consumer_queue, max_running): 
    running = [] 

    while not producer_queue.empty(): 
     running = [r for r in running if r.is_alive()] 
     if len(running) < max_running: 
      producer_item = producer_queue.get() 
      p = ProducerProcess(producer_item, consumer_queue) 
      p.start() 
      running.append(p) 
     time.sleep(1) 

CONSUMER

def process_consumer_chunk(queue, chunksize=10000): 
    for i in xrange(0, chunksize): 
     try: 
      # don't wait too long for an item 
      # if new records don't arrive in 10 seconds, process what you have 
      # and let the next process pick up more items. 

      record = queue.get(True, 10) 
     except Queue.Empty:     
      break 

     do_stuff_with_record(record) 

MAIN

if __name__ == "__main__": 
    manager = multiprocessing.Manager() 
    consumer_queue = manager.Queue(1024*1024) 
    producer_queue = manager.Queue() 

    producer_items = xrange(0,10) 

    for item in producer_items: 
     producer_queue.put(item) 

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) 
    p.start() 

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

Hier ist, wo es kitschig wird. Ich kann map nicht verwenden, da die zu konsumierende Liste gleichzeitig gefüllt wird. Also muss ich in eine While-Schleife gehen und versuchen, eine Zeitüberschreitung zu erkennen. Die consumer_queue kann leer werden, während die Produzenten noch versuchen, sie zu füllen, damit ich nicht einfach eine leere Queue aufspüren kann.

timed_out = False 
    timeout= 1800 
    while 1: 
     try: 
      result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,)) 
      if timed_out: 
       timed_out = False 

     except Queue.Empty: 
      if timed_out: 
       break 

      timed_out = True 
      time.sleep(timeout) 
     time.sleep(1) 

    consumer_queue.join() 
    consumer_pool.close() 
    consumer_pool.join() 

Ich dachte, dass vielleicht könnte ich() die Datensätze im Hauptthread bekommen und stattdessen die Warteschlange in der Weitergabe an Personen in die Verbraucher weitergeben, aber ich denke, dass ich mit dem gleichen Problem auf diese Weise enden. Ich muss noch eine while-Schleife laufen lassen und apply_async() verwenden. Vielen Dank im Voraus für jeden Hinweis!

Antwort

2

Sie könnten eine manager.Event verwenden, um das Ende der Arbeit zu signalisieren. Dieses Ereignis kann zwischen all Ihren Prozessen ausgetauscht werden, und wenn Sie es dann aus Ihrem Hauptprozess signalisieren, können die anderen Mitarbeiter dann ordnungsgemäß heruntergefahren werden.

while not event.is_set(): 
...rest of code... 

Ihre Kunden würden also auf das Setzen des Ereignisses warten und die Bereinigung übernehmen, sobald sie eingestellt ist.

Um festzustellen, wann dieses Flag gesetzt werden soll, können Sie einen join auf den Producer-Threads tun, und wenn diese alle abgeschlossen sind, können Sie dann auf den Consumer-Threads beitreten.

+0

Ich denke, das wird funktionieren. Vielen Dank! Ich bin mir nicht sicher, wie die join() aus Ihrer Beschreibung heraus funktioniert, aber ich denke, ich habe einen Weg gefunden. Ich übergebe das Ereignis an den Prozess start_producer_process() und setze es(), nachdem alle Produzenten die consumer_queue hinzugefügt haben. An diesem Punkt (zurück im Haupt-Thread), wenn die consumer_queue leer ist, bedeutet dies, dass alles verarbeitet wurde, so dass ich sicher aus der while-Schleife ausbrechen kann. – user1914881

+0

Sorry für den verwirrenden Teil, der Join wäre im Hauptthread, so dass du das Programm nicht einfach beenden würdest, nachdem deine Produzenten fertig waren und die Konsumenten gerade angefangen hatten, ihre Arbeit zu machen. – sean

0

Ich würde dringend empfehlen SimPy anstelle Multiprozess/Threading zu tun diskrete Ereignissimulation.