2014-07-16 3 views
23

Ich versuche ein wenig zu verstehen, was hinter den Kulissen passiert, wenn die Methode apply_sync eines Multiprocessing-Pools verwendet wird.Wer führt den Rückruf aus, wenn die apply_async-Methode eines Multiprocessing-Pools verwendet wird?

Wer führt die Callback-Methode aus? Ist es der Hauptprozess, der apply_async aufgerufen hat?

Angenommen, ich sende eine ganze Reihe von apply_async-Befehlen mit Rückrufen und dann mit meinem Programm fort. Mein Programm macht immer noch Dinge, wenn der Start von apply_async beendet wird. Wie führt der Callback den "Hauptprozess" durch, während der Hauptprozess noch mit dem Skript beschäftigt ist?

Hier ist ein Beispiel.

import multiprocessing 
import time 

def callback(x): 
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x) 

def func(x): 
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x) 
    return x 

pool = multiprocessing.Pool() 

args = range(20) 

for a in args: 
    pool.apply_async(func, (a,), callback=callback) 

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name) 

t0 = time.time() 
while time.time() - t0 < 60: 
    pass 

print 'Finished with the script' 

Der Ausgang ist so etwas wie

PoolWorker-1 läuft func mit ARG 0

PoolWorker-2 ausgeführt func mit ARG 1

PoolWorker-3 Lauf func mit ARG 2

MainProcess geht für eine Minute schlafen < - Hauptprozess ist damit beschäftigt

PoolWorker-4 läuft func mit ARG 3

PoolWorker-1 Lauf func mit ARG 4

PoolWorker-2 ausgeführt func mit ARG 5

PoolWorker-3 func Lauf mit ARG 6

PoolWorker-4 läuft func mit ARG 7

MainProcess Callback-w ith arg 0 < - Hauptprozess läuft Callback, während es noch in der While-Schleife ist !!

MainProcess Callback mit ARG 1

MainProcess Callback mit ARG 2

MainProcess Callback mit ARG 3

MainProcess Callback mit ARG 4

PoolWorker-1 läuft func mit arg 8

...

mit Skript Finished

Wie wird MainProcess den Rückruf läuft, während es in der Mitte dieses while-Schleife ist ??

Es gibt diese Aussage über den Rückruf in der Dokumentation für multiprocessing.Pool, die wie ein Hinweis scheint, aber ich verstehe es nicht.

apply_async (func [, args [, kwds [, Rückruf]]])

Eine Variante des apply() Methode, die ein Ergebnis-Objekt zurückgibt.

Wenn Callback angegeben ist, sollte es eine Callable sein, die ein einzelnes Argument akzeptiert. Wenn das Ergebnis bereit ist, wird ein Rückruf darauf angewendet (sofern der Anruf nicht fehlgeschlagen ist). Der Rückruf sollte sofort abgeschlossen werden, da sonst der Thread, der die Ergebnisse behandelt, blockiert wird.

Antwort

25

Es ist in der Tat ein Hinweis in der Dokumentation:

Rückruf seit vervollständigen sofort sollte sonst der Faden, die die Ergebnisse Griffe wird blockiert werden.

Die Rückrufe im Hauptprozess behandelt werden, aber sie in einem eigenen Thread laufen. Wenn Sie eine Pool schaffen es tatsächlich schafft ein paar Thread Objekte intern:

class Pool(object): 
    Process = Process 

    def __init__(self, processes=None, initializer=None, initargs=(), 
       maxtasksperchild=None): 
     self._setup_queues() 
     self._taskqueue = Queue.Queue() 
     self._cache = {} 
     ... # stuff we don't care about 
     self._worker_handler = threading.Thread(
      target=Pool._handle_workers, 
      args=(self,) 
      ) 
     self._worker_handler.daemon = True 
     self._worker_handler._state = RUN 
     self._worker_handler.start() 

     self._task_handler = threading.Thread(
      target=Pool._handle_tasks, 
      args=(self._taskqueue, self._quick_put, self._outqueue, 
        self._pool, self._cache) 
      ) 
     self._task_handler.daemon = True 
     self._task_handler._state = RUN 
     self._task_handler.start() 

     self._result_handler = threading.Thread(
      target=Pool._handle_results, 
      args=(self._outqueue, self._quick_get, self._cache) 
      ) 
     self._result_handler.daemon = True 
     self._result_handler._state = RUN 
     self._result_handler.start() 

Die interessanten Thread für uns ist _result_handler; Wir werden in Kürze zu dem Thema kommen.

Getriebe für einen zweiten Schalter, wenn Sie apply_async laufen, es entsteht ein ApplyResult Objekt intern von dem Kind immer das Ergebnis zu verwalten:

def apply_async(self, func, args=(), kwds={}, callback=None): 
    assert self._state == RUN 
    result = ApplyResult(self._cache, callback) 
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 
    return result 

class ApplyResult(object): 

    def __init__(self, cache, callback): 
     self._cond = threading.Condition(threading.Lock()) 
     self._job = job_counter.next() 
     self._cache = cache 
     self._ready = False 
     self._callback = callback 
     cache[self._job] = self 


    def _set(self, i, obj): 
     self._success, self._value = obj 
     if self._callback and self._success: 
      self._callback(self._value) 
     self._cond.acquire() 
     try: 
      self._ready = True 
      self._cond.notify() 
     finally: 
      self._cond.release() 
     del self._cache[self._job] 

Wie Sie die _set Methode sehen kann, ist die, die Enden up tatsächlich Ausführen der callback übergeben, vorausgesetzt, die Aufgabe war erfolgreich. Beachten Sie auch, dass es sich zu einem globalen cache dict am Ende __init__ ergänzt.

Jetzt zurück zum _result_handler Thread-Objekt. Das Objekt ruft die _handle_results Funktion, die wie folgt aussieht:

while 1: 
     try: 
      task = get() 
     except (IOError, EOFError): 
      debug('result handler got EOFError/IOError -- exiting') 
      return 

     if thread._state: 
      assert thread._state == TERMINATE 
      debug('result handler found thread._state=TERMINATE') 
      break 

     if task is None: 
      debug('result handler got sentinel') 
      break 

     job, i, obj = task 
     try: 
      cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called! 
     except KeyError: 
      pass 

     # More stuff 

Es ist eine Schleife, die nur Ergebnisse von Kindern aus Warteschlange zieht, findet den entsprechenden Eintrag in cache und fordert _set, die unseren Rückruf ausführt. Es kann ausgeführt werden, obwohl Sie sich in einer Schleife befinden, da es nicht im Hauptthread ausgeführt wird.

+0

Danke Dano, dass du dir die Zeit genommen hast, eine so detaillierte Antwort zu schreiben! Wenn ich richtig verstehe, erzeugt der Pool einen * einzigen * neuen Thread (den result_handler), dessen Aufgabe es ist, nur darauf zu warten, dass apply_async beendet wird und dann den Callback im Thread von result_handler aufruft (der Teil des MainProcess ist). Werden die Callbacks (für ein einzelnes Pool-Objekt) sequenziell aufgerufen? I.e. Ein Bündel von apply_async's kann zusammen enden, aber die Callbacks werden nacheinander von result_handler? – Alex

+1

Eine weitere Frage. Was passiert, wenn die Callback-Funktion und das Hauptscript mit denselben Objekten (im MainProcess) verschmutzen? Kann es unvorhersehbares Verhalten geben? I.e. wenn der Callback und etwas später im Hauptskript beide versuchen, in die gleiche Datei zu schreiben oder das gleiche Array zu modifizieren. Wenn der Callback tatsächlich ausgeführt wird, weiß wer, was das Hauptscript zu diesem Zeitpunkt macht. – Alex

+4

@Alex Ja, die Callbacks werden nacheinander ausgeführt.Der '_result_handler'-Thread zieht eine abgeschlossene Task aus der Warteschlange, ruft' _set' (die den Callback ausführt) auf und geht dann zum nächsten über. Aus diesem Grund wird in der Dokumentation darauf hingewiesen, dass der Rückruf sofort abgeschlossen wird. Durch das Ausführen des Rückrufs werden andere Ergebnisse von der Verarbeitung blockiert. – dano

Verwandte Themen