2012-08-07 5 views
22

Ich benutze Prozess und Warteschlange Multiprocessing. Ich beginne mehrere Funktionen parallel und die meisten verhalten sich gut: Sie beenden, ihre Ausgabe geht in ihre Warteschlange, und sie erscheinen als .is_alive() == False. Aber aus irgendeinem Grund verhalten sich einige Funktionen nicht. Sie zeigen immer .is_alive() == Richtig, auch nachdem die letzte Zeile in der Funktion (eine Druckanweisung mit der Angabe "Finished") abgeschlossen ist. Dies geschieht unabhängig von den Funktionen, die ich starte, auch wenn es nur eine gibt. Wenn sie nicht parallel laufen, verhalten sich die Funktionen gut und kehren normal zurück. Was Art der Sache könnte das Problem sein?Python-Multiprocessing: einige Funktionen kehren nicht zurück, wenn sie abgeschlossen sind (Queue-Material zu groß)

Hier ist die generische Funktion, die ich verwende, um die Jobs zu verwalten. Alles, was ich nicht zeige, sind die Funktionen, die ich ihm gebe. Sie sind lang, benutzen oft Matplotlib, manchmal starten sie Shell-Befehle, aber ich kann nicht herausfinden, was die Fehler haben.

def runFunctionsInParallel(listOf_FuncAndArgLists): 
    """ 
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order. 
    """ 
    from multiprocessing import Process, Queue 

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue 
     print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name 
     que.put(fff(*theArgs)) #we're putting return value into queue 
     print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name 
     # We get this far even for "bad" functions 
     return 

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function 
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)] 
    for job in jobs: job.start() # Launch them all 
    import time 
    from math import sqrt 
    n=1 
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates 
     n+=1 
     time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs. 
     print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------') 
     print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)])) 
     print('---------------------------------------------------\n') 
    # I never get to the following line when one of the "bad" functions is running. 
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues? 
    # And now, collect all the outputs: 
    return([queue.get() for queue in queues]) 
+1

Komplette Aufnahme im Dunkeln: Führen die, die hängen, einen Wert zurück? (wörtlich, haben sie "Rückkehr" in ihnen?) – Logan

+0

Alle Funktionen, gut und schlecht, geben eine einzelne (lange) Zeichenfolge zurück. – CPBL

+0

Wenn ich jedoch die Verwendung von Warteschlangen eliminiere, verschwindet das Problem. Also ... eine Warteschlange wurde gefüllt. Ich kann es sehen, und es sieht gut aus, aber irgendwie ist der Job nicht beendet, wenn es eine zugehörige Warteschlange gibt (und nur für "schlechte" Funktionen). – CPBL

Antwort

14

In Ordnung, es scheint, dass das Rohr die Queue zu füllen verwendet gesteckt wird, wenn die Ausgabe einer Funktion zu groß ist (mein rohes Verständnis? Dies ist ein ungelöster/geschlossen Bug? http://bugs.python.org/issue8237). Ich habe den Code in meiner Frage geändert, so dass es einige Pufferung gibt (Warteschlangen werden regelmäßig geleert, während Prozesse laufen), die alle meine Probleme löst. Nun nimmt dies eine Sammlung von Aufgaben (Funktionen und ihre Argumente), startet sie und sammelt die Ausgaben. Ich wünschte, es wäre einfacher/sauberer aussehen.

Bearbeiten (2014 Sep; Update 2017 Nov: umgeschrieben für Lesbarkeit): Ich aktualisiere den Code mit den Verbesserungen, die ich seitdem gemacht habe. Der neue Code (gleiche Funktion, aber bessere Funktionen) ist hier: https://github.com/cpbl/cpblUtilities/blob/master/parallel.py

Die aufrufende Beschreibung ist auch unten.

def runFunctionsInParallel(*args, **kwargs): 
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments. 
    """ 
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs() 

########################################################################################### 
### 
class cRunFunctionsInParallel(): 
    ### 
    ####################################################################################### 
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel. 
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied. 
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name. 
Parameters 
---------- 
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs], 
    specifying the set of functions to be launched in parallel. If an 
    element is just a function, rather than a list, then it is assumed 
    to have no arguments or keyword arguments. Thus, possible formats 
    for elements of the outer list are: 
     function 
     [function, list] 
     [function, list, dict] 
kwargs: dict 
    One can also supply the kwargs once, for all jobs (or for those 
    without their own non-empty kwargs specified in the list) 
names: an optional list of names to identify the processes. 
    If omitted, the function name is used, so if all the functions are 
    the same (ie merely with different arguments), then they would be 
    named indistinguishably 
offsetsSeconds: int or list of ints 
    delay some functions' start times 
expectNonzeroExit: True/False 
    Normal behaviour is to not proceed if any function exits with a 
    failed exit code. This can be used to override this behaviour. 
parallel: True/False 
    Whenever the list of functions is longer than one, functions will 
    be run in parallel unless this parameter is passed as False 
maxAtOnce: int 
    If nonzero, this limits how many jobs will be allowed to run at 
    once. By default, this is set according to how many processors 
    the hardware has available. 
showFinished : int 
    Specifies the maximum number of successfully finished jobs to show 
    in the text interface (before the last report, which should always 
    show them all). 
Returns 
------- 
Returns a tuple of (return codes, return values), each a list in order of the jobs provided. 
Issues 
------- 
Only tested on POSIX OSes. 
Examples 
-------- 
See the testParallel() method in this module 
    """ 
+1

"Wenn dies nicht funktioniert, ist das Material, das Sie von Ihren Funktionen zurückgeben, möglicherweise nicht pickbar und daher nicht in der Lage, die Warteschlangen ordnungsgemäß zu durchlaufen." Riesige Hilfe, ich hatte genau dieses Problem und wusste nicht, dass 'Multiprocessing' auf Beizen beruht, um Objekte zwischen Prozessen zu übergeben (einschließlich der Rückgabe von Ergebnissen). – Michael

+0

Nur ein Vorschlag, aber Sie sollten ein wenig Zeit investieren, um dies lesbar zu machen.Es gibt wahrscheinlich ein paar wirklich nützliche Dinge hier, aber es ist fast unmöglich zu sagen. –

+0

Ja, wir verwenden diese Funktion eine Tonne, um große Wirkung. Ich weiß vielleicht nicht, wie man es lesbar macht, aber ich werde es noch einmal versuchen. Vielen Dank. https://github.com/cpbl/cpblUtilities/issues/10 – CPBL

Verwandte Themen