Ich benutze das Multiprocessing-Modul, um eine sehr große Aufgabe aufzuteilen. Es funktioniert größtenteils, aber ich muss etwas Offensichtliches mit meinem Design vermissen, denn auf diese Weise ist es sehr schwer für mich, effektiv zu sagen, wann alle Daten verarbeitet wurden.Multiprocessing - Producer/Consumer Design
Ich habe zwei separate Aufgaben, die ausgeführt werden; einer, der den anderen füttert. Ich denke, das ist ein Produzenten-/Konsumentenproblem. Ich verwende eine geteilte Warteschlange zwischen allen Prozessen, wobei die Produzenten die Warteschlange füllen und die Konsumenten aus der Warteschlange lesen und die Verarbeitung durchführen. Das Problem ist, dass es eine endliche Datenmenge gibt. Irgendwann muss jeder wissen, dass alle Daten verarbeitet wurden, damit das System ordnungsgemäß heruntergefahren werden kann.
Es scheint sinnvoll zu sein, die map_async() -Funktion zu verwenden, aber da die Produzenten die Warteschlange füllen, kenne ich nicht alle Punkte im Voraus, also muss ich in eine while-Schleife gehen und benutze apply_async() und versuche zu erkennen, wann alles mit einer Art Timeout fertig ist ... hässlich.
Ich fühle mich, als ob ich etwas Offensichtliches vermisse. Wie kann das besser gestaltet werden?
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
CONSUMER
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
MAIN
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
Hier ist, wo es kitschig wird. Ich kann map nicht verwenden, da die zu konsumierende Liste gleichzeitig gefüllt wird. Also muss ich in eine While-Schleife gehen und versuchen, eine Zeitüberschreitung zu erkennen. Die consumer_queue kann leer werden, während die Produzenten noch versuchen, sie zu füllen, damit ich nicht einfach eine leere Queue aufspüren kann.
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
Ich dachte, dass vielleicht könnte ich() die Datensätze im Hauptthread bekommen und stattdessen die Warteschlange in der Weitergabe an Personen in die Verbraucher weitergeben, aber ich denke, dass ich mit dem gleichen Problem auf diese Weise enden. Ich muss noch eine while-Schleife laufen lassen und apply_async() verwenden. Vielen Dank im Voraus für jeden Hinweis!
Ich denke, das wird funktionieren. Vielen Dank! Ich bin mir nicht sicher, wie die join() aus Ihrer Beschreibung heraus funktioniert, aber ich denke, ich habe einen Weg gefunden. Ich übergebe das Ereignis an den Prozess start_producer_process() und setze es(), nachdem alle Produzenten die consumer_queue hinzugefügt haben. An diesem Punkt (zurück im Haupt-Thread), wenn die consumer_queue leer ist, bedeutet dies, dass alles verarbeitet wurde, so dass ich sicher aus der while-Schleife ausbrechen kann. – user1914881
Sorry für den verwirrenden Teil, der Join wäre im Hauptthread, so dass du das Programm nicht einfach beenden würdest, nachdem deine Produzenten fertig waren und die Konsumenten gerade angefangen hatten, ihre Arbeit zu machen. – sean