Ich benutze Prozess und Warteschlange Multiprocessing. Ich beginne mehrere Funktionen parallel und die meisten verhalten sich gut: Sie beenden, ihre Ausgabe geht in ihre Warteschlange, und sie erscheinen als .is_alive() == False. Aber aus irgendeinem Grund verhalten sich einige Funktionen nicht. Sie zeigen immer .is_alive() == Richtig, auch nachdem die letzte Zeile in der Funktion (eine Druckanweisung mit der Angabe "Finished") abgeschlossen ist. Dies geschieht unabhängig von den Funktionen, die ich starte, auch wenn es nur eine gibt. Wenn sie nicht parallel laufen, verhalten sich die Funktionen gut und kehren normal zurück. Was Art der Sache könnte das Problem sein?Python-Multiprocessing: einige Funktionen kehren nicht zurück, wenn sie abgeschlossen sind (Queue-Material zu groß)
Hier ist die generische Funktion, die ich verwende, um die Jobs zu verwalten. Alles, was ich nicht zeige, sind die Funktionen, die ich ihm gebe. Sie sind lang, benutzen oft Matplotlib, manchmal starten sie Shell-Befehle, aber ich kann nicht herausfinden, was die Fehler haben.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------\n')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
Komplette Aufnahme im Dunkeln: Führen die, die hängen, einen Wert zurück? (wörtlich, haben sie "Rückkehr" in ihnen?) – Logan
Alle Funktionen, gut und schlecht, geben eine einzelne (lange) Zeichenfolge zurück. – CPBL
Wenn ich jedoch die Verwendung von Warteschlangen eliminiere, verschwindet das Problem. Also ... eine Warteschlange wurde gefüllt. Ich kann es sehen, und es sieht gut aus, aber irgendwie ist der Job nicht beendet, wenn es eine zugehörige Warteschlange gibt (und nur für "schlechte" Funktionen). – CPBL