2017-11-26 1 views
2

Ich habe eine Funktion f, die ich auf bestimmten großen Daten parallel berechnen möchte. Die Daten können auf viele Arten geteilt werden, und ich versuche eine Entscheidung darüber zu treffen, wie man sie teilt. Ich versuche zu verstehen, wie "Karte" in Multiprocessing.Pool die Daten genau verteilen/teilen, so dass ich die richtige Entscheidung über die Aufteilung meiner Daten sowie die Auswahl der Anzahl der Prozessoren treffen. Meine Eingabedaten sind nicht einfach eine Liste, wie im folgenden Beispiel, sondern eine Liste von Wörterbüchern und eine Liste von Listen, die verstehen, wie Pool.map die Daten teilt.Wie teilt die Karte Daten auf, wenn sie zusammen mit Pool in Multiprocessing verwendet werden?

Davon abgesehen denke ich, dass das Verständnis des einfachen Beispiels über den komplizierteren erzählen würde.

Das folgende Scipt zeigt, dass wir einen Pool von 5 Prozessen und die Daten in [1,2,3] wählen. Was ist die implizite Wahl, die hier getroffen wird, um die Daten zu teilen?

from multiprocessing import Pool 

def f(x): 
    return x*x 

if __name__ == '__main__': 
    p = Pool(5) 
    print(p.map(f, [1, 2, 3])) 

Antwort

0

Mein naives Verständnis ist, dass Pool einfach die Eingabeliste in der Reihenfolge verarbeitet, die ersten ‚n‘ Elemente an den Pool zurückzuschicken, und dann nach dem erste Prozess wieder verfügbar ist, wird, dass man das nächste Element, bis Es gibt keine Elemente mehr. Schließlich wartet es darauf, dass alle Elemente abgeschlossen sind, bevor es zurückkehrt.

Sie sollen ein Experiment mit einer Liste tun: [2,2,2,5,2,2,2] und eine Funktion:

def f(x): 
    sleep(x) 
    return x * x 
+0

Beziehen Sie sich hier auf ein spezifisches "n" (Anzahl der Prozesse)? – Steve

+0

Ja, der Parameter zu Pool – quamrana

5

Es ist nicht dokumentiert, so dass Sie nicht darauf verlassen sollen irgendein bestimmtes Verhalten. Sie können es erzwingen, indem Sie das optionale Argument chunksize= übergeben. Wenn Sie dies nicht tun, wird eine Heuristik verwendet, um einen Wert von chunksize für Sie zu erstellen. Dies kann in privaten Funktion _map_async(), in dem Quellbaum des Lib/multiprocessing/Pool.py finden:

def _map_async(self, func, iterable, mapper, chunksize=None, ... 
    ''' 
    Helper function to implement map, starmap and their async counterparts. 
    ''' 
    ... 
    if chunksize is None: 
     chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 
     if extra: 
      chunksize += 1 
    if len(iterable) == 0: 
     chunksize = 0 
    ... 

len(self._pool) ist die Anzahl der Arbeitsprozesse. Wenn also weniger Arbeitsaufgaben als die vierfache Anzahl der Prozesse vorhanden sind, werden sie standardmäßig einzeln verteilt. Das ist der Fall in Ihrem spezifischen Beispiel (3 <= 4*5). Wenn es sehr viel mehr Arbeitselemente als Prozesse gibt, wird die Chunk-Größe ausgewählt, so dass jedem Prozess über die Lebensdauer des map() ungefähr ein vierfaches Stück Arbeit übergeben wird. Wenn Sie beispielsweise 500 Elemente in Ihrer Liste hätten, würden 500/(5*4) == 25 und so 25 Elemente gleichzeitig an einen Arbeitsprozess übergeben.

Warum nicht 100 auf einmal, so dass jeder der 5 Arbeiter nur einmal aufgerufen würde? Weil es eine Heuristik ist ;-) Weniger als das zu überschreiten ist ein Kompromiss, der die Anzahl der Interprozess-Kommunikationen gegen den Lastenausgleich ausgleicht (die Möglichkeit, dass verschiedene Arbeitsaufgaben unterschiedliche Zeit benötigen). Aber nichts über den Lastausgleich kann im Voraus bekannt sein, so dass die Heuristik mehr Gewicht (aber nicht absolut!) Für die niedrige Anzahl von Interprozessaufrufen ergibt.

Und deshalb ist es nicht dokumentiert. Es ist durchaus möglich, dass irgendwann eine intelligentere Heuristik eingesetzt wird.

2

Sie können den Weg sehen, multiprocessing.Pool.map befasst sich mit der Arbeitsteilung zwischen Prozessen here.

Kurz gesagt, teilt es die gegebene iterable in Stücke, deren Größe die iterierbare Größe dividiert durch die Anzahl der Arbeiter Zeit 4 ist.

In Ihnen konkretes Beispiel:

In [1]: chunksize, extra = divmod(len([1,2,3]), 5 * 4) 
In [2]: if extra: 
    ...:  chunksize += 1 
    ...:  
In [3]: chunksize 
Out[3]: 1 

Es wird drei Stücke Größe ergeben 1.

Sie können die Chunkgröße sich über den chunksize Parameter steuern.

+1

Sie missverstehen den Code: Die 'divmod()' gibt eine 'chunksize' von 0 und eine' extra' von 3. Da 'extra' nicht 0 ist, wird' chunksize' erhöht von 1. –

+0

Sie haben Recht! Lass es mich reparieren. – noxdafox

Verwandte Themen