2013-01-22 4 views
7

Ich bin auf der Suche nach einer soliden Implementierung, die es mir erlaubt, schrittweise eine Liste von Elementen mit Queue zu bearbeiten.Objekte progressiv aufnehmen, sobald eine Warteschlange verfügbar ist

Die Idee ist, dass ich eine bestimmte Anzahl von Arbeitern verwenden möchte, die durch eine Liste von 20+ datenbankintensiven Aufgaben gehen und das Ergebnis zurückgeben. Ich möchte, dass Python mit den fünf ersten Elementen beginnt und sobald es mit einer Aufgabe fertig ist, beginnt es mit der nächsten Aufgabe in der Warteschlange.

So mache ich es derzeit ohne Threading.

for key, v in self.sources.iteritems(): 
    # Do Stuff 

Ich möchte einen ähnlichen Ansatz haben, aber möglicherweise ohne die Liste in Untergruppen von fünf aufzuteilen. Damit wird automatisch das nächste Element in der Liste aufgenommen. Das Ziel ist es, sicherzustellen, dass, wenn eine Datenbank den Prozess verlangsamt, es keine negativen Auswirkungen auf die gesamte Anwendung haben wird.

Antwort

5

Sie können dies selbst implementieren, aber Python 3 kommt bereits mit einer Executor -basierten Lösung für die Threadverwaltung, die Sie in Python 2.x verwenden können, indem Sie installieren.

Ihr Code könnte dann folgendermaßen aussehen

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 
    future_to_key = {} 
    for key, value in sources.items(): 
     future_to_idday[executor.submit(do_stuff, value)] = key 
    for future in concurrent.futures.as_completed(future_to_key): 
     key = future_to_key[future] 
     result = future.result() 
     # process result 
+0

Dank. Ich werde das ausprobieren. Vergessen zu erwähnen, dass ich 2.x benutzt habe. – eandersson

3

Wenn Sie python3 verwenden, ich das gleichzeitige Futures Modul empfehlen. Wenn Sie python3 nicht verwenden und nicht mit Threads (im Gegensatz zu Prozessen) verbunden sind, dann versuchen Sie vielleicht multiprocessing.Pool (obwohl es einige Vorbehalte gibt und ich Probleme mit Pools hatte, die in meinen Anwendungen nicht richtig geschlossen wurden). Wenn Sie in python2 Threads verwenden müssen, schreiben Sie vielleicht selbst Code - erstellen Sie 5 Threads, die Consumer-Funktionen ausführen, und schieben Sie die Aufrufe (Funktion + Argumente) iterativ in die Warteschlange, damit die Konsumenten sie finden und verarbeiten können.

+0

['multiprocessing.dummy' bietet auch die gleiche Schnittstelle] (http://stackoverflow.com/a/14461365/4279) mit Threads anstelle von Prozessen – jfs

1

Man kann es nur stdlib mit tun:

#!/usr/bin/env python 
from multiprocessing.dummy import Pool # use threads 

def db_task(key_value): 
    try: 
     key, value = key_value 
     # compute result.. 
     return result, None 
    except Exception as e: 
     return None, e 

def main(): 
    pool = Pool(5) 
    for result, error in pool.imap_unordered(db_task, sources.items()): 
     if error is None: 
      print(result) 

if __name__=="__main__": 
    main() 
Verwandte Themen