2013-04-29 4 views
15

Ich möchte etwas ähnlich wie executor.map, außer wenn ich über die Ergebnisse iterieren, ich möchte über sie in der Reihenfolge der Fertigstellung iterieren, z. Das Arbeitselement, das zuerst abgeschlossen wurde, sollte zuerst in der Iteration usw. erscheinen. Dies ist der Fall, wenn die Iteration blockiert wird, wenn noch nicht jedes einzelne Arbeitselement in der Sequenz beendet ist.Python `concurrent.futures`: Iterate auf Futures nach der Reihenfolge der Fertigstellung

Ich weiß, wie ich dies mithilfe von Warteschlangen implementieren kann, aber ich frage mich, ob es möglich ist, das futures Framework zu verwenden.

(I meist gebrauchte Thread-basierten Testamentsvollstrecker, so würde ich gern eine Antwort, die auf diese trifft, sondern eine allgemeine Antwort wäre auch willkommen.)

UPDATE: Danke für die Antworten! Kannst du mir bitte erklären, wie ich as_completed mit executor.map benutzen kann? executor.map ist das nützlichste und prägnanteste Werkzeug für mich bei der Verwendung von Futures, und ich würde ungern anfangen, Future Objekte manuell zu verwenden.

+0

Sie haben Glück! – damzam

Antwort

25

executor.map(), wie das Builtin map(), liefert nur Ergebnisse in der Reihenfolge der iterable, so können Sie leider nicht die Reihenfolge der Fertigstellung zu bestimmen, verwenden. concurrent.futures.as_completed() ist das, was Sie suchen - hier ist ein Beispiel:

import time 
import concurrent.futures 

times = [3, 1, 2] 

def sleeper(secs): 
    time.sleep(secs) 
    print('I slept for {} seconds'.format(secs)) 
    return secs 

# returns in the order given 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    print(list(executor.map(sleeper, times))) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [3, 1, 2] 

# returns in the order completed 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    futs = [executor.submit(sleeper, secs) for secs in times] 
    print([fut.result() for fut in concurrent.futures.as_completed(futs)]) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [1, 2, 3] 

Natürlich, wenn Sie eine Karte Schnittstelle verwenden müssen, könnten Sie Ihre eigene map_as_completed() Funktion erstellen, die die oben kapselt (vielleicht fügen Sie es zu einem subclassed Executor()), aber ich denke, Erstellen von Futures-Instanzen durch executor.submit() ist eine einfachere/sauberere Weg zu gehen (auch ermöglicht Ihnen, No-Args, Kwargs).

0

From python doc

concurrent.futures.as_completed(fs, timeout=None)¶ 

Gibt einen Iterator über die Zukunft Instanzen (möglicherweise durch verschiedene Executor Instanzen erstellt) durch fs gegeben, die Futures, da sie komplett (fertig ergibt oder wurden storniert). Alle Futures, die zuvor als as_completed() ausgeführt wurden, werden zuerst ausgegeben. Der zurückgegebene Iterator löst einen TimeoutError aus, wenn next() aufgerufen wird und das Ergebnis nach Timeout Sekunden aus dem ursprünglichen Aufruf as_completed() nicht verfügbar ist. Timeout kann ein Int oder Float sein. Wenn kein Timeout angegeben ist oder Keine, , gibt es keine Begrenzung für die Wartezeit.

Sie müssten Unterschied zwischen executor.map() und executor.submit() zu verstehen. Der erste bildet eine Funktion auf einen Vektor von Argumenten ab. Es ist sehr ähnlich zu map, aber Tasks asynchron starten. submit(func, arg) startet bei jedem Aufruf eine Aufgabe. In dieser Task wird func auf arg angewendet.

Hier ist ein Beispiel für die Verwendung as_completed() mit submit(), die ich auf Python 3 ausführen konnte.0

from concurrent import futures 
import urllib.request 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

def main(): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = dict(
      (executor.submit(load_url, url, 60), url) 
      for url in URLS) 

     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       print('%r page is %d bytes' % (
          url, len(future.result()))) 
      except Exception as e: 
       print('%r generated an exception: %s' % (
          url, e)) 

if __name__ == '__main__': 
    main() 

kein map() wird hier verwendet, Aufgaben laufen mit submit und as_completed()

gibt einen Iterator über die Zukunft Instanzen von fs gegeben, die Futures, da sie komplett (fertigen oder wurden gestrichen) ergibt .

Verwandte Themen