2016-07-13 4 views
1

Ich benutze Multiprocessing in Python mit dem pool.apply_async, um gleichzeitig eine Funktion mit verschiedenen Argumenten auszuführen.Ändern Sie die Art und Weise, in der Pythons Multiprocessing Fehler verursacht

Der relevante Auszug des Codes ist:

import multiprocessing as mp 

all_details_to_process_full = [[x,y.z], [x2,y2.z2]] 

def loop_over_desired_sub(arg_list): 
    ... 

if __name__ == '__main__': 
    pool = mp.Pool(processes=10) 
    desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) for arg_list in all_details_to_process_full]  
    results = [p.get() for p in desired_content] 

Soweit ich das beurteilen kann, für Python des Standardverhalten nur den Code zu stoppen, wenn ein Fehler durch die früheste subprocess angehoben wird, die initialisiert wurde.

Wenn beispielsweise 10 Elemente in der Liste mit separaten Unterprozessen verarbeitet werden und ein Fehler bei der Verarbeitung des ersten Elements auftritt (dh des ersten untergeordneten Prozesses), wird Python sofort den Fehler auslösen und den Fehler beheben Code. Wenn jedoch ein Fehler in dem zweiten Unterprozess auftritt, während dieser Unterprozess stoppt, wird der Rest des Codes fortgesetzt, bis der erste Eintrag beendet ist, zu welchem ​​Zeitpunkt der Fehler ausgelöst wird und der Code stoppt. [Wenn ein Fehler beim Verarbeiten des dritten Elements auftritt, müssen beide Elemente eins und zwei beendet werden, bevor der Fehler ausgelöst wird].

Gibt es eine Möglichkeit, dieses Verhalten zu ändern, sowohl für:

  1. Jeder Fehler ausgelöst, das heißt in einem der Teilprozess, um den Code zu stoppen sofort

  2. Der Code nicht zu stoppen wenn es wird ein Fehler ausgelöst, bis alle subprocessed haben

    fertig

Antwort

2

Jeder Ihrer Prozess sind hier unabhängig wie Sie apply_async verwenden. Daher ist das Standardverhalten von Python, sie unabhängig zu verarbeiten, was bedeutet, dass ein Fehler den anderen nicht beeinträchtigt. Das Problem besteht darin, dass Sie die Ergebnisse Ihrer Funktion loop_over_desired_content in einer geordneten Weise verarbeiten. Die get-Methode wird blockiert, bis das Ergebnis der ersten Operation abgerufen wird (auch wenn der zweite Prozess zurückgegeben wurde/fehlgeschlagen ist). Dann verarbeitet es den 2. Wert und löst bei Bedarf einen Fehler aus.

import multiprocessing as mp 
import time 


def fail_in(args): 
    x, l = args 
    if x == l: 
     raise RuntimeError(x) 
    time.sleep(.5) 
    print("Finish process {}".format(x)) 
    return x 


if __name__ == '__main__': 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 0) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 0 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 1) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 1 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 
    pool = mp.Pool(processes=3) 
    tasks = [(i, 4) for i in range(9)] 

    try: 
     desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks] 
     t1 = time.time() 
     results = [p.get() for p in desired_content] 
    except RuntimeError: 
     print("apply_async 4 failed in {:4.2}s".format(time.time()-t1)) 
    pool.terminate() 

Beachten Sie, dass der verbleibende Prozess durch diesen Fehler nicht beendet wird. Sie können es sehen, indem Sie versuchen, einen neuen Job im Pool zu senden, ohne terminate zu verwenden. Sie werden gestartet, nachdem alle verbleibenden Prozesse von Ihrem vorherigen Job abgeschlossen sind.

Um eine schnellere Fehlerbenachrichtigung zu erhalten, können Sie die Methode imap_unordered verwenden, die einen Fehler auslöst, sobald ein Fehler zurückgegeben wird. Sie müssen vorsichtig sein, da Sie Job_id verwenden müssen, um die Bestellung zu finden.
Sie könnten auch die Benachrichtigung mit dem callback_error erhalten, um in diesem Fall eine Bereinigung durchzuführen.

Für die 2. behvior, fordern alle das Ergebnis Prozess zu sein, bevor der Fehler erhöhen, können Sie einfach verwenden:

desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) 
        for arg_list in all_details_to_process_full] 
results = [] 
for p in desired_content: 
    try: 
     r = p.get() 
    except Exception as r: 
     pass 
    results += [r] 

results = [p.get() for p in desired_content] 
Verwandte Themen