2016-11-18 1 views
0

Also ich versuche, eine generische Lösung zu erarbeiten, die alle Werte von einer Funktion sammelt und sie an eine Liste anfügt, die später zugänglich ist. Dies ist während concurrent.futures oder threading Art Aufgaben zu verwenden. Hier ist eine Lösung, die ich unter Verwendung eines globalen master_list:Wie können Funktionsrückgabewerte während des Multithreading gesammelt werden, ohne globale Variablen zu verwenden?

from concurrent.futures import ThreadPoolExecutor 

master_list = [] 
def return_from_multithreaded(func): 
    # master_list = [] 
    def wrapper(*args, **kwargs): 
     # nonlocal master_list 
     global master_list 
     master_list += func(*args, **kwargs) 
    return wrapper 


@return_from_multithreaded 
def f(n): 
    return [n] 


with ThreadPoolExecutor(max_workers=20) as exec: 
    exec.map(f, range(1, 100)) 

print(master_list) 

Ich möchte eine Lösung finden, die nicht Globals enthält, und vielleicht kann kehrt der Kommentar gesetzt master_list, die als Verschluss gespeichert?

Antwort

2

Wenn Sie möchten, verwerfen, nicht Globals verwenden, nicht die Ergebnisse von map. map gibt Ihnen die von jeder Funktion zurückgegebenen Werte zurück, Sie haben sie einfach ignoriert. Dieser Code könnte viel einfacher gemacht werden, indem map für den vorgesehenen Zweck:

def f(n): 
    return n # No need to wrap in list 

with ThreadPoolExecutor(max_workers=20) as exec: 
    master_list = list(exec.map(f, range(1, 100))) 

print(master_list) 

Wenn Sie ein master_list benötigen, die die bisherigen Ergebnisse berechnet zeigt (vielleicht einige andere Thread ist es zu beobachten), die Sie gerade die Schleife machen explizite :

def f(n): 
    return n # No need to wrap in list 

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    for result in exec.map(f, range(1, 100)): 
     master_list.append(result) 

print(master_list) 

Dafür ist das Executor-Modell vorgesehen; Normale Threads sind nicht dazu gedacht, Werte zurückzugeben, aber Executors haben einen Kanal zur Rückgabe von Werten unter den Deckblättern bereitgestellt, sodass Sie sie nicht selbst verwalten müssen. Intern verwendet dies Warteschlangen der einen oder anderen Art mit zusätzlichen Metadaten, um die Ergebnisse in Ordnung zu halten, aber Sie müssen sich nicht mit dieser Komplexität befassen. Aus Ihrer Sicht ist es gleichbedeutend mit der normalen map Funktion, es passiert nur zufällig, die Arbeit zu parallelisieren.


aktualisieren mit Ausnahmen Umgang abdecken:

map alle Ausnahmen in der Arbeiter erhöht erhöhen wird, wenn das Ergebnis getroffen wird. Daher wird, wie geschrieben, der erste Satz Code nichts speichern, wenn eine der Aufgaben fehlschlägt (die list wird teilweise konstruiert, aber weggeworfen, wenn die Ausnahme ausgelöst wird). Im zweiten Beispiel werden nur Ergebnisse gespeichert, bevor die erste Ausnahme ausgelöst wird. Der Rest wird verworfen (Sie müssen den map Iterator speichern und einen umständlichen Code verwenden, um dies zu vermeiden). Wenn Sie alle erfolgreichen Ergebnisse speichern müssen, Fehler ignorieren (oder sie nur auf irgendeine Weise protokollieren), ist es am einfachsten, submit Objekte list von Future zu erstellen, dann warten Sie auf sie, entweder seriell oder in der Reihenfolge der Fertigstellung, umhüllen die .result() Anrufe in try/except zu vermeiden, gute Ergebnisse zu verwerfen. Zum Beispiel der Vorlage speichern Ergebnisse um, dann würden Sie tun:

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    futures = [exec.submit(f, i) for i in range(1, 100)] 
    exec.shutdown(False) # Optional: workers terminate as soon as all futures finish, 
          # rather than waiting for all results to be processed 
    for fut in futures: 
     try: 
      master_list.append(fut.result()) 
     except Exception: 
      ... log error here ... 

für effizienteren Code, Sie Ergebnisse in der Reihenfolge der Fertigstellung abrufen können, nicht Vorlage, mit concurrent.futures.as_completed eifrig Ergebnisse abgerufen werden, da sie zu beenden.Die einzige Änderung von dem vorherigen Code ist, dass:

for fut in concurrent.futures.as_completed(futures): 

wo as_completed macht die Arbeit von yield ing, sobald sie vollständig abgeschlossen/abgebrochen Futures statt, bis alle Futures verzögern:

for fut in futures: 

wird früher eingereicht vollständig und behandelt werden.

Es gibt kompliziertere Optionen, die die Verwendung von add_done_callback beinhalten, so dass der Hauptthread überhaupt nicht explizit die Ergebnisse behandelt, aber das ist normalerweise unnötig und oft verwirrend, also sollte es am besten vermieden werden.

+0

+1 für das Teilen der schönen Information. Ich hatte einen Zweifel, wie verhält es sich, wenn die übergebene Funktion eine Ausnahme auslöst? Geht das damit um? –

+0

@Moinuddin In meiner Erfahrung, um Fehlerbehandlung mit ThreadPoolExcecutors anstelle von 'map' zu tun, verwenden Sie' submit', die eine Zukunft zurückgibt, und dann 'future.result()' aufrufen, nachdem es abgeschlossen ist. Dies wird alle gefangenen Ausnahmen auslösen. – flybonzai

+0

@flybonzai: Yar. Machen Sie eine "Liste" von 'Future's, dann, wenn die Reihenfolge der Ergebnisse wichtig ist, iterieren Sie einfach die' list' und rufen Sie 'result' sequentiell auf (in' try'/'except' verpackt), um im Arbeiter aufgeworfene Exceptions zu behandeln). Wenn die Reihenfolge der Ergebnisse keine Rolle spielt, verwenden Sie [concurrent.futures.as_completed'] (https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed), was den ' Zukünftige Objekte, wie sie fertig sind (erfolgreich oder aufgrund einer Ausnahme); Wiederum rufen Sie 'result' in einem' try'/'except'-Block auf, um Fehler zu behandeln. Letzteres ist in der Regel effizienter, wenn die Reihenfolge unwichtig ist. – ShadowRanger

2

Ich habe dieses Problem in der Vergangenheit konfrontiert: Running multiple asynchronous function and get the returned value of each function. Das war mein Ansatz, es zu tun:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

Probelauf:

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2] 
Verwandte Themen