2012-08-16 13 views
18

Ich möchte mehrere Instanzen von program.py gleichzeitig ausführen und gleichzeitig die Anzahl der Instanzen begrenzen, die gleichzeitig ausgeführt werden (z. B. die Anzahl der auf meinem System verfügbaren CPU-Kerne)). Zum Beispiel, wenn ich 10 Kerne habe und 1000 Läufe von program.py insgesamt machen muss, werden nur 10 Instanzen erstellt und laufen zu jeder gegebenen Zeit.Multiprocessing in Python bei gleichzeitiger Begrenzung der Anzahl laufender Prozesse

Ich habe versucht, das Multiprocessing-Modul, Multithreading und Warteschlangen zu verwenden, aber es gibt nichts, was mir zu einer einfachen Implementierung schien. Das größte Problem, das ich habe, ist die Möglichkeit, die Anzahl der gleichzeitig ablaufenden Prozesse zu begrenzen. Das ist wichtig, denn wenn ich 1000 Prozesse gleichzeitig erstelle, wird es einer Gabelbombe gleichgestellt. Ich brauche die Ergebnisse von den Prozessen nicht programmgesteuert (sie auf Datenträger ausgeben), und die Prozesse werden unabhängig voneinander ausgeführt.

Kann mir bitte jemand Vorschläge oder ein Beispiel geben, wie ich das in Python oder gar Bash implementieren könnte? Ich würde den Code, den ich bisher geschrieben habe, mit Warteschlangen posten, aber es funktioniert nicht wie beabsichtigt und ist möglicherweise schon falsch.

Vielen Dank.

+2

Haben Sie [Python-Prozesspools] (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool) ausprobiert? – C2H5OH

+0

Der einfachste Weg, dies zu tun, ist ein "Controller" -Programm zu erstellen, das den 'multiprocessing.pool' erzeugt und die Worker (program.py) -Threads erzeugt, wobei die Arbeit nach Beendigung der Instanzen neu zugewiesen wird. – jozzas

+0

Danke, ich werde es versuchen; Bei meinem ersten Versuch kam ich aus irgendeinem Grund zu dem Schluss, dass multiprocessing.pool nicht das ist, was ich wollte, aber jetzt scheint es richtig zu sein. In diesem Fall würden Worker-Threads einfach program.py erzeugen (als Thread? Mit subprocess.Popen)? Könnten Sie bitte ein ungefähres Beispiel oder eine Template-Implementierung veröffentlichen, der ich folgen könnte? – steadfast

Antwort

2

Bash-Skript, anstatt Python, aber ich verwende es oft für einfache Parallelverarbeitung:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

Oder sehr einfachen Fällen eine weitere Option ist xargs wobei P die Anzahl der Procs setzt

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

Sie sollten einen Prozess-Supervisor verwenden. Ein Ansatz wäre die Verwendung der API von Circus, um dies "programmatisch" zu tun, die Dokumentations-Site ist jetzt offline, aber ich denke, es ist nur ein vorübergehendes Problem, Sie können den Circus dafür verwenden. Ein anderer Ansatz wäre die Verwendung des supervisord und das Setzen des Parameters numprocs des Prozesses auf die Anzahl der Kerne, die Sie haben.

Ein Beispiel unter Verwendung von Circus:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

Ich weiß, Sie erwähnt, dass die Pool.map Ansatz nicht viel Sinn für Sie macht. Die Karte ist nur eine einfache Möglichkeit, um eine Quelle der Arbeit zu geben, und eine aufrufbar für jeden der Elemente. Die func für die Karte könnte jeder Einstiegspunkt sein, um die eigentliche Arbeit an dem gegebenen arg zu tun.

Wenn das für Dich scheint nicht richtig, habe ich eine ziemlich detaillierte Antwort hier über ein Producer-Consumer-Muster mit: https://stackoverflow.com/a/11196615/496445

Im Wesentlichen erstellen Sie eine Warteschlange, und starten N Anzahl der Arbeitnehmer. Dann füttern Sie die Warteschlange entweder aus dem Hauptthread oder erstellen einen Producer-Prozess, der die Warteschlange füttert. Die Arbeiter nehmen immer nur Arbeit aus der Warteschlange und es wird niemals mehr gleichzeitige Arbeit geben als die Anzahl der Prozesse, die Sie gestartet haben.

Sie haben auch die Möglichkeit, die Warteschlange zu begrenzen, so dass sie den Erzeuger blockiert, wenn es bereits zu viel ausstehende Arbeit gibt, wenn Sie auch die Geschwindigkeit und Ressourcen des Herstellers einschränken müssen.

Die Arbeitsfunktion, die aufgerufen wird, kann alles tun, was Sie wollen. Dies kann ein Wrapper um einen Systembefehl sein, oder er kann Ihre Python-Lib importieren und die Hauptroutine ausführen. Es gibt bestimmte Prozess-Management-Systeme, die es Ihnen ermöglichen, Konfigurationen für die Ausführung von beliebigen ausführbaren Dateien unter begrenzten Ressourcen einzurichten. Dies ist jedoch nur ein einfacher Python-Ansatz.

Snippets aus diesem other answer von mir:

Grund Pool:

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

Mit Hilfe eines Prozess-Manager und Produzent

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

Sehr gutes Beispiel, ich habe es verbessert, indem ich die Anzahl von CPUS, wie sie in http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core erklären, bekomme und so konnte ich num_workers basierend auf dinamally setzen die CPUs der Maschine. –

0

Zwar gibt es viele Antworten zu Multiprozessing mit .pool, es gibt nicht viele Code-Schnipsel auf h Verwenden Sie Multiprocessing.Process, was in der Tat vorteilhafter ist, wenn die Speichernutzung wichtig ist. Starten von 1000 Prozessen wird die CPU überlasten und den Speicher löschen. Wenn jeder Prozess und seine Datenpipelines speicherintensiv sind, begrenzt OS oder Python selbst die Anzahl der parallelen Prozesse. Ich habe den folgenden Code entwickelt, um die gleichzeitige Anzahl von Jobs zu begrenzen, die in Stapeln an die CPU gesendet werden. Die Stapelgröße kann proportional zur Anzahl der CPU-Kerne skaliert werden. In meinem Windows-PC kann die Anzahl der Jobs pro Batch bis zu 4-mal so hoch sein wie die verfügbaren CPU-Kurse.

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

Das einzige Problem ist, wenn eine des Jobs in jedem Charge konnte nicht abgeschlossen werden und führt zu einer hängenden Situation Rest der Chargen von Arbeitsplätzen nicht eingeleitet werden. Daher muss die zu verarbeitende Funktion über geeignete Fehlerbehandlungsroutinen verfügen.

Verwandte Themen