2012-06-15 10 views
9

Wenn eine große Anzahl von Aufgaben (mit großen Parametern) mit Pool.apply_async ausgeführt wird, werden die Prozesse zugeordnet und in einen Wartezustand versetzt, und es gibt keine Begrenzung für die Anzahl der wartenden Prozesse. Dies kann unter durch den Verzehr der gesamten Speichers, wie im Beispiel am Ende:Python Multiprocessing: Wie kann die Anzahl der wartenden Prozesse begrenzt werden?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

ich für eine Art und Weise bin auf der Suche die Warteschlange zu begrenzen, so dass es nur eine begrenzte Anzahl von wartenden Prozessen ist, und Pool.apply_async ist blockiert, während die Warteschlange voll ist.

+0

Nizza Beispiel (+1). – mgilson

Antwort

6

multiprocessing.Pool hat eine _taskqueue Mitglied des Typs multiprocessing.Queue, die einen optionalen maxsize Parameter übernimmt; Leider baut es ohne den maxsize Parametersatz.

würde ich Subklassifizieren multiprocessing.Pool mit einer Copy-Paste von multiprocessing.Pool.__init__ empfehlen, die maxsize-_taskqueue Konstruktor übergibt.

Monkey-Patching das Objekt (entweder den Pool oder die Warteschlange) auch funktionieren würde, aber Sie würden pool._taskqueue._maxsize und pool._taskqueue._sem monkeypatch haben, so wäre es recht spröde sein:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

Ich benutze Python 2.7.3, und die _taskqueue ist vom Typ Queue.Queue. Es bedeutet, dass es sich um eine einfache Queue handelt und nicht um eine Multiprocessing.Queue. Subclassing Multiprocessing.Pool und überschreiben __init__ funktioniert gut, aber Affen-Patching das Objekt funktioniert nicht wie erwartet. Dies ist jedoch der Hack, nach dem ich gesucht habe, danke. –

0

Sie explizite Queue hinzufügen könnte mit maxsize-Parameter und verwenden Sie in diesem Fall queue.put() anstelle von pool.apply_async(). Dann Arbeitsprozesse könnten:

for a, b in iter(queue.get, sentinel): 
    # process it 

Wenn Sie die Anzahl der erzeugten Eingabeargumente/Ergebnisse einschränken möchten, die in etwa der Zahl der aktiven Arbeitsprozesse im Speicher sind, dann könnten Sie pool.imap*() Methoden verwenden:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

Die Verwendung von 'imap' macht keinen Unterschied. Die Eingabewarteschlange ist immer noch unbegrenzt und die Verwendung dieser Lösung wird am Ende alle Speicher verbrauchen. – Radim

+0

@Radim: der 'imap' Code in der Antwort funktioniert auch wenn Sie ihm einen unendlichen Generator geben. – jfs

+0

Nicht in Python 2, leider (Code in py3 nicht angeschaut). Für einige Workarounds siehe [diese SO-Antwort] (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim

1

warten, wenn pool._taskqueue über die gewünschte Größe ist:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

Ich möchte nur hinzufügen, dass ich dies als die einfachste Lösung für meine Speicherprobleme mit Multiprocessing gefunden habe. Ich habe max_apply_size = 10 verwendet und das funktioniert gut für mein Problem, das ist eine langsame Dateikonvertierung. Die Verwendung eines Semaphors, wie es @ecatmur vorschlägt, scheint eine robustere Lösung zu sein, könnte aber für einfache Skripte zuviel sein. – Nate

Verwandte Themen