2014-10-06 3 views
9

Ich habe einige grundlegende Fragen, wenn es multiprocessing Modul zur Verwendung von Python kommt:Muss ich multiprocessing.Queue Instanzvariablen explizit an einen untergeordneten Prozess übergeben? Wird der Prozess für eine Instanzmethode ausgeführt?

class Someparallelworkerclass(object) : 

    def __init__(self): 
     self.num_workers = 4 
     self.work_queue = multiprocessing.JoinableQueue() 
     self.result_queue = multiprocessing.JoinableQueue() 

    def someparallellazymethod(self): 
     p = multiprocessing.Process(target=self.worktobedone).start() 

    def worktobedone(self): 
     # get data from work_queue 
     # put back result in result queue 

Ist es notwendig, work_queue und result_queue als args zu Process passieren? Ist die Antwort vom Betriebssystem abhängig? Die grundlegendere Frage lautet: Ermittelt der untergeordnete Prozess einen kopierten (COW-) Adressraum vom übergeordneten Prozess und kennt er die Definition der Klassen-/Klassenmethode? Wenn ja, woher weiß es, dass die Warteschlangen für IPC freigegeben werden sollen und dass es keine Duplikate von work_queue und result_queue im untergeordneten Prozess erstellen soll? Ich habe versucht, diese online zu suchen, aber die meisten der Unterlagen, die ich fand, waren vage und gingen nicht auf Details ein, als was genau darunter passiert.

Antwort

-1

Der untergeordnete Prozess erhält keinen kopierten Adressraum. Das Kind ist ein völlig separater Python-Prozess mit nichts geteilt. Ja, Sie müssen die Warteschlangen an das Kind weitergeben. Wenn Sie dies tun, übernimmt Multiprocessing automatisch die Freigabe über IPC. Siehe https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes.

+0

Dies ist nicht ganz richtig. Unter Linux wird der untergeordnete Prozess vom übergeordneten Zweig abgeleitet, sodass er tatsächlich einen schreibgeschützten Adressraum vom übergeordneten Knoten erhält. Und tatsächlich können Sie in der Warteschlange in das Kind "einsteigen" und das Ergebnis vom Elternteil "abholen", ohne die "Warteschlange" explizit an das Kind unter Linux * und * Windows zu übergeben. Die einzigen Fälle, in denen es nicht zu funktionieren scheint, sind Python 3.4+ mit Linux, die die Kontexte "spawn" oder "forkserver" verwenden. – dano

+0

Eigentlich nehme ich diesen letzten Satz zurück. Das war auf einen Fehler von mir zurückzuführen. Sie können die Warteschlangenobjekte immer implizit übergeben, unabhängig vom Kontext/der Plattform. – dano

+0

+ dano, gute Antwort. Du bist eindeutig ein Multiprozessor-Guru! – Matt

7

Es ist eigentlich nicht notwendig, die Warteschlangen in diesem Fall in das Argument args aufzunehmen, egal welche Plattform Sie verwenden. Der Grund ist, dass, obwohl es nicht so aussieht, dass Sie die beiden JoinableQueue Instanzen explizit an das Kind weitergeben, Sie tatsächlich - über self. Da self explizit an das Kind übergeben wird und die beiden Warteschlangen ein Teil von self sind, werden sie am Kind weitergegeben.

Unter Linux geschieht dies über os.fork(), was bedeutet, dass Filedeskriptoren von den multiprocessing.connection.Connection Objekten verwendet, die die Queue intern verwendet für die Kommunikation zwischen Prozessen sind durch das Kind vererbt (nicht kopiert). Andere Teile der Queue werden copy-on-write, aber das ist in Ordnung; multiprocessing.Queue ist so konzipiert, dass keines der Teile, die kopiert werden müssen, tatsächlich zwischen den beiden Prozessen synchron bleiben muss. In der Tat, erhalten viele der internen Attribute zurückgesetzt, nachdem der fork auftritt:

def _after_fork(self): 
    debug('Queue._after_fork()') 
    self._notempty = threading.Condition(threading.Lock()) 
    self._buffer = collections.deque() 
    self._thread = None 
    self._jointhread = None 
    self._joincancelled = False 
    self._closed = False 
    self._close = None 
    self._send = self._writer.send # _writer is a 
    self._recv = self._reader.recv 
    self._poll = self._reader.poll 

Damit deckt Linux. Wie wäre es mit Windows? Windows hat fork nicht, also muss es self picken, um es dem Kind zu schicken, und das schließt das Beizen unserer Queues mit ein. Nun, in der Regel, wenn Sie versuchen, ein multiprocessing.Queue beizen, es scheitert:

>>> import multiprocessing 
>>> q = multiprocessing.Queue() 
>>> import pickle 
>>> pickle.dumps(q) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex 
    dict = getstate() 
    File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__ 
    assert_spawning(self) 
    File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning 
    ' through inheritance' % type(self).__name__ 
RuntimeError: Queue objects should only be shared between processes through inheritance 

Aber das ist eigentlich eine künstliche Beschränkung. multiprocessing.Queue Objekte können in einigen Fällen eingelegt werden - wie sonst könnten sie an untergeordnete Prozesse in Windows gesendet werden? Und in der Tat können wir sehen, ob wir bei der Umsetzung aussehen:

def __getstate__(self): 
    assert_spawning(self) 
    return (self._maxsize, self._reader, self._writer, 
      self._rlock, self._wlock, self._sem, self._opid) 

def __setstate__(self, state): 
    (self._maxsize, self._reader, self._writer, 
    self._rlock, self._wlock, self._sem, self._opid) = state 
    self._after_fork() 

__getstate__, die aufgerufen wird, wenn eine Instanz Beizen, hat einen assert_spawning Anruf darin, die sicherstellt, wir tatsächlich einen Prozess Laichen beim Versuch, die Gurke *. __setstate__, die beim Entpacken aufgerufen wird, ist verantwortlich für den Aufruf _after_fork.

Also wie werden die Connection Objekte von den Warteschlangen verwendet, wenn wir pürieren müssen?Es stellt sich heraus, dass es ein multiprocessing Untermodul gibt, das genau das tut - multiprocessing.reduction. Der Kommentar am Anfang des Moduls heißt es ziemlich deutlich:

# 
# Module to allow connection and socket objects to be transferred 
# between processes 
# 

Unter Windows das Modul schließlich den von Windows bereitgestellt DuplicateHandle API verwendet, kann einen doppelten Griff, dass das Kind Prozess Connection Objekt verwenden zu erstellen. So, während jeder Prozess seinen eigenen Griff bekommt, sind sie genaue Duplikate - jede Aktion, die an einer gemacht wird, spiegelt sich auf der anderen:

Der doppelte Griff bezieht sich auf das gleiche Objekt wie der ursprüngliche Griff. Daher werden alle Änderungen am Objekt über beide Handles wiedergegeben. Wenn Sie beispielsweise ein Datei-Handle duplizieren, ist die aktuelle Position der Datei für beide Handles immer gleich.

* Siehe this answer für weitere Informationen über assert_spawning

1

Das Kind Prozess nicht die Warteschlangen in seiner Schließung hat. Es sind Instanzen der Warteschlangen, die auf verschiedene Speicherbereiche verweisen. Wenn Sie Warteschlangen so verwenden, wie Sie beabsichtigen, müssen Sie sie als Argumente an die Funktion übergeben. Eine Lösung, die ich mag, ist, functools.partial zu verwenden, um Ihre Funktionen mit den gewünschten Warteschlangen zu curry zu machen, sie dauerhaft zu seiner Schließung hinzuzufügen und Ihnen mehrere Threads zu ermöglichen, dieselbe Aufgabe mit dem gleichen IPC-Kanal auszuführen.

Verwandte Themen