2016-05-06 12 views
4

Ich habe mehrere Stunden auf verschiedene Versuche, meine Zahlenverarbeitung zu parallelisieren, aber es wird nur langsamer, wenn ich dies tue. Leider verschwindet das Problem, wenn ich versuche, es auf das unten stehende Beispiel zu reduzieren und ich möchte hier nicht wirklich das gesamte Programm posten. Die Frage ist also: Welche Fallstricke sollte ich bei dieser Art von Programm vermeiden?Minimieren Sie Overhead in Python Multiprocessing.Pool mit numpy/scipy

. (Anmerkung: Follow-up nach der Antwort des Unutbu ist an der Unterseite ist)

Hier sind die Umstände:

  • Es geht um ein Modul, das eine Klasse BigData mit vielen internen Daten definiert. Im Beispiel gibt es eine Liste ff der Interpolationsfunktionen; in dem tatsächlichen Programm gibt es mehr, z. B. ffA[k], ffB[k], ffC[k].
  • Die Berechnung würde als "peinlich parallel" klassifiziert: Die Arbeit kann auf kleineren Datenblöcken gleichzeitig durchgeführt werden. Im Beispiel ist das do_chunk().
  • Der im Beispiel gezeigte Ansatz würde in meinem eigentlichen Programm zu der schlechtesten Leistung führen: ungefähr 1 Sekunde pro Block (zusätzlich zu 0,1 Sekunden der tatsächlichen Berechnungszeit in einem einzigen Thread). Also, für n = 50 würde do_single() in 5 Sekunden laufen und do_multi() würde in 55 Sekunden laufen.
  • Ich habe auch versucht, die Arbeit zu spalten, indem die xi und yi Arrays in zusammenhängenden Blöcken Schneiden und alle k Werte in jeder Chunk iterieren. Das hat ein bisschen besser funktioniert. Jetzt gab es keinen Unterschied in der gesamten Ausführungszeit, ob ich 1, 2, 3 oder 4 Threads verwendete. Aber natürlich möchte ich eine tatsächliche Beschleunigung sehen!
  • Dies kann verwandt sein: Multiprocessing.Pool makes Numpy matrix multiplication slower. An anderer Stelle im Programm verwendete ich jedoch einen Multiprocessing-Pool für Berechnungen, die sehr viel isolierter waren: eine Funktion (nicht an eine Klasse gebunden), die ungefähr so ​​aussieht wie def do_chunk(array1, array2, array3) und Berechnungen nur für diesen Array durchführt. Dort gab es einen deutlichen Geschwindigkeitsschub.
  • Die CPU-Auslastung wird wie erwartet mit der Anzahl der parallelen Prozesse skaliert (300% CPU-Auslastung für drei Threads).
#!/usr/bin/python2.7 

import numpy as np, time, sys 
from multiprocessing import Pool 
from scipy.interpolate import RectBivariateSpline 

_tm=0 
def stopwatch(msg=''): 
    tm = time.time() 
    global _tm 
    if _tm==0: _tm = tm; return 
    print("%s: %.2f seconds" % (msg, tm-_tm)) 
    _tm = tm 

class BigData: 
    def __init__(self, n): 
     z = np.random.uniform(size=n*n*n).reshape((n,n,n)) 
     self.ff = [] 
     for i in range(n): 
      f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1) 
      self.ff.append(f) 
     self.n = n 

    def do_chunk(self, k, xi, yi): 
     s = np.sum(np.exp(self.ff[k].ev(xi, yi))) 
     sys.stderr.write(".") 
     return s 

    def do_multi(self, numproc, xi, yi): 
     procs = [] 
     pool = Pool(numproc) 
     stopwatch('Pool setup') 
     for k in range(self.n): 
      p = pool.apply_async(_do_chunk_wrapper, (self, k, xi, yi)) 
      procs.append(p) 
     stopwatch('Jobs queued (%d processes)' % numproc) 
     sum = 0.0 
     for k in range(self.n): 
      # Edit/bugfix: replaced p.get by procs[k].get 
      sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt 
      if k == 0: stopwatch("\nFirst get() done") 
     stopwatch('Jobs done') 
     pool.close() 
     pool.join() 
     return sum 

    def do_single(self, xi, yi): 
     sum = 0.0 
     for k in range(self.n): 
      sum += self.do_chunk(k, xi, yi) 
     stopwatch('\nAll in single process') 
     return sum 

def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk 
    return bd.do_chunk(k, xi, yi)   

if __name__ == "__main__": 
    stopwatch() 
    n = 50 
    bd = BigData(n) 
    m = 1000*1000 
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m)) 
    stopwatch('Initialized') 
    bd.do_multi(2, xi, yi) 
    bd.do_multi(3, xi, yi) 
    bd.do_single(xi, yi) 

Der Ausgang:

Initialized: 0.06 seconds 
Pool setup: 0.01 seconds 
Jobs queued (2 processes): 0.03 seconds 
.. 
First get() done: 0.34 seconds 
................................................Jobs done: 7.89 seconds 
Pool setup: 0.05 seconds 
Jobs queued (3 processes): 0.03 seconds 
.. 
First get() done: 0.50 seconds 
................................................Jobs done: 6.19 seconds 
.................................................. 
All in single process: 11.41 seconds 

Timings sind auf einem Intel Core i3-3227 CPU mit 2 Kerne, 4 Threads, 64-Bit-Linux läuft. Für das eigentliche Programm war die Multi-Processing-Version (Pool-Mechanismus, selbst wenn nur ein Kern verwendet wurde) um Faktor 10 langsamer als die Single-Process-Version.

Follow-up

Unutbu Antwort hat mich auf dem richtigen Weg. Im aktuellen Programm wurde self in ein 37 bis 140 MB-Objekt gebeizt, das an die Worker-Prozesse übergeben werden musste. Schlimmer noch, das Python-Beizen ist sehr langsam; Das Beizen selbst dauerte ein paar Sekunden, was für jeden Arbeitsteil passierte, der an die Arbeitsprozesse weitergegeben wurde. Anders als das Beizen und Übergeben von großen Datenobjekten ist der Overhead von apply_async in Linux sehr klein; Für eine kleine Funktion (Hinzufügen einiger ganzzahliger Argumente) dauert es nur 0,2 ms pro 0 Paar. So ist die Aufteilung der Arbeit in sehr kleinen Brocken kein Problem für sich. Also übertrage ich alle großen Array-Argumente als Indizes zu globalen Variablen.Ich halte die Chunk-Größe klein für den Zweck der CPU-Cache-Optimierung.

Die globalen Variablen werden in einem globalen dict gespeichert; Die Einträge werden sofort im übergeordneten Prozess gelöscht, nachdem der Worker-Pool eingerichtet wurde. Nur die Schlüssel zu dict werden an den Arbeitsprozess übertragen. Die einzigen großen Daten für das Beizen/IPC sind die neuen Daten, die von den Mitarbeitern erstellt werden.

#!/usr/bin/python2.7 

import numpy as np, sys 
from multiprocessing import Pool 

_mproc_data = {} # global storage for objects during multiprocessing. 

class BigData: 
    def __init__(self, size): 
     self.blah = np.random.uniform(0, 1, size=size) 

    def do_chunk(self, k, xi, yi): 
     # do the work and return an array of the same shape as xi, yi 
     zi = k*np.ones_like(xi) 
     return zi 

    def do_all_work(self, xi, yi, num_proc): 
     global _mproc_data 
     mp_key = str(id(self)) 
     _mproc_data['bd'+mp_key] = self # BigData 
     _mproc_data['xi'+mp_key] = xi 
     _mproc_data['yi'+mp_key] = yi 
     pool = Pool(processes=num_proc) 
     # processes have now inherited the global variabele; clean up in the parent process 
     for v in ['bd', 'xi', 'yi']: 
      del _mproc_data[v+mp_key] 

     # setup indices for the worker processes (placeholder) 
     n_chunks = 45 
     n = len(xi) 
     chunk_len = n//n_chunks 
     i1list = np.arange(0,n,chunk_len) 
     i2list = i1list + chunk_len 
     i2list[-1] = n 
     klist = range(n_chunks) # placeholder 

     procs = [] 
     for i in range(n_chunks): 
      p = pool.apply_async(_do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i])) 
      sys.stderr.write(".") 
      procs.append(p) 
     sys.stderr.write("\n") 

     # allocate space for combined results 
     zi = np.zeros_like(xi) 

     # get data from workers and finish 
     for i, p in enumerate(procs): 
      zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling 

     pool.close() 
     pool.join() 

     return zi 

def _do_chunk_wrapper(key, i1, i2, k): 
    """All arguments are small objects.""" 
    global _mproc_data 
    bd = _mproc_data['bd'+key] 
    xi = _mproc_data['xi'+key][i1:i2] 
    yi = _mproc_data['yi'+key][i1:i2] 
    return bd.do_chunk(k, xi, yi) 


if __name__ == "__main__": 
    xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001) 
    bd = BigData(int(1e7)) 
    bd.do_all_work(xi, yi, 4) 

Hier werden die Ergebnisse einer Geschwindigkeitstest sind (wiederum 2 Kerne, 4 threads), die Anzahl der Arbeitsprozesse unterschiedlicher und die Menge an Speicher in den Chunks (total Bytes des xi, yi, zi Array-Scheiben). Die Zahlen sind in "Millionen Ergebniswerte pro Sekunde", aber das ist für den Vergleich nicht so wichtig. Die Zeile für "1 Prozess" ist ein direkter Aufruf an do_chunk mit den vollen Eingabedaten, ohne irgendwelche Unterprozesse.

Der Einfluss der Datengröße im Speicher ist ziemlich signifikant. Die CPU verfügt über 3 MB gemeinsamen L3-Cache sowie 256 KB L2-Cache pro Kern. Beachten Sie, dass die Berechnung auch Zugriff auf mehrere MB interner Daten des Objekts BigData benötigt. Was wir daraus lernen, ist also, dass es sinnvoll ist, diese Art von Geschwindigkeitstest durchzuführen. Für dieses Programm sind 2 Prozesse am schnellsten, gefolgt von 4 und 3 am langsamsten.

+0

Abgesehen: Haben Sie sich in [dask] (http: //dask.pydata .org/de/letzte /)? Es kann viele Ihrer Multiprocessing-Aufgaben vereinfachen. –

+0

@ajcr Noch nicht. Aber jetzt möchte ich die Anzahl der externen Abhängigkeiten minimieren, da ich diese möglicherweise auf einem Server ausführen muss, auf dem ich keine Administratorrechte habe, und ihn mit anderen Leuten teilen kann, die die gleiche Einschränkung haben. –

+0

Auch das macht mir Angst: "Dask-Arrays implementieren eine ** Teilmenge ** der NumPy-Schnittstelle auf großen Arrays". Das klingt nach einer Menge potentieller Arbeit, um mit existierendem Code zu interagieren. –

Antwort

6

Versuchen Sie, die Interprozesskommunikation zu reduzieren. In dem multiprocessing Modul alle Interprozesskommunikation (Einzel-Computer) über Warteschlangen. Objekte, die durch eine Warteschlange übergeben wurden, sind gebeizt. Versuchen Sie also, weniger und/oder kleinere Objekte durch die Warteschlange zu senden.

  • self nicht senden Sie, die Instanz von BigData durch die Queue. Es ist ziemlich groß, und wird größer als die Menge der Menge der Daten in self wächst:

    In [6]: import pickle 
    In [14]: len(pickle.dumps(BigData(50))) 
    Out[14]: 1052187 
    

    Jeden Zeit pool.apply_async(_do_chunk_wrapper, (self, k, xi, yi)) genannt wird, self im Hauptprozess und ungebeizte in dem Arbeitsprozess gebeizt. Die Größe von len(pickle.dumps(BigData(N))) wächst um N steigt.

  • Lassen Sie die Daten von einer globalen Variablen gelesen werden. Unter Linux können Sie Copy-on-Write nutzen. Wie Jan-Philip Gehrcke explains:

    Nach fork() sind Eltern und Kind in einem gleichwertigen Zustand. Es wäre dumm, den gesamten Speicher des Elternteils an einen anderen Ort im RAM zu kopieren. Das ist der Punkt, an dem das Copy-on-Write-Prinzip ankommt. Solange das Kind seinen Speicherzustand nicht ändert, greift es tatsächlich auf den Speicher des Elterns zu. Nur bei einer Modifikation werden die entsprechenden Bits und Stücke in den Speicherraum des Kindes kopiert.

    So können Sie Vergehen Instanzen von BigData durch die Queue , indem einfach die Definition der Instanz als globaler vermeiden, bd = BigData(n), (wie Sie bereits tun) und unter Hinweis auf seine Werte in den Arbeitsprozessen (zB _do_chunk_wrapper) .Es beläuft sich grundsätzlich auf self aus dem Aufruf von pool.apply_async Entfernung:

    p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi)) 
    

    und bd als globaler Zugriff auf und die notwendigen begleitenden Änderungen do_chunk_wrapper ‚s Call-Signatur.

  • Versuchen Sie, die länger laufenden Funktionen func an pool.apply_async zu übergeben. Wenn Sie viele schnell ablaufende Aufrufe an pool.apply_async haben, wird der Overhead der Übergabe von Argumenten und Rückgabewerten durch die Warteschlange zu einem wesentlichen Teil der Gesamtzeit. Wenn Sie stattdessen weniger Aufrufe an pool.apply_async tätigen und jedem func mehr Arbeit geben, bevor ein Ergebnis zurückgegeben wird, wird die Interprozesskommunikation zu einem kleineren Bruchteil der Gesamtzeit.

    Unten modifizierte I _do_chunk_wrapperk_start und k_end Argumente zu akzeptieren, so dass jeder Aufruf von pool.apply_async würde die Summe für viele Werte von k berechnen, bevor ein Ergebnis zurück.


import math 
import numpy as np 
import time 
import sys 
import multiprocessing as mp 
import scipy.interpolate as interpolate 

_tm=0 
def stopwatch(msg=''): 
    tm = time.time() 
    global _tm 
    if _tm==0: _tm = tm; return 
    print("%s: %.2f seconds" % (msg, tm-_tm)) 
    _tm = tm 

class BigData: 
    def __init__(self, n): 
     z = np.random.uniform(size=n*n*n).reshape((n,n,n)) 
     self.ff = [] 
     for i in range(n): 
      f = interpolate.RectBivariateSpline(
       np.arange(n), np.arange(n), z[i], kx=1, ky=1) 
      self.ff.append(f) 
     self.n = n 

    def do_chunk(self, k, xi, yi): 
     n = self.n 
     s = np.sum(np.exp(self.ff[k].ev(xi, yi))) 
     sys.stderr.write(".") 
     return s 

    def do_chunk_of_chunks(self, k_start, k_end, xi, yi): 
     s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi))) 
        for k in range(k_start, k_end)) 
     sys.stderr.write(".") 
     return s 

    def do_multi(self, numproc, xi, yi): 
     procs = [] 
     pool = mp.Pool(numproc) 
     stopwatch('\nPool setup') 
     ks = list(map(int, np.linspace(0, self.n, numproc+1))) 
     for i in range(len(ks)-1): 
      k_start, k_end = ks[i:i+2] 
      p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi)) 
      procs.append(p) 
     stopwatch('Jobs queued (%d processes)' % numproc) 
     total = 0.0 
     for k, p in enumerate(procs): 
      total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt 
      if k == 0: stopwatch("\nFirst get() done") 
     print(total) 
     stopwatch('Jobs done') 
     pool.close() 
     pool.join() 
     return total 

    def do_single(self, xi, yi): 
     total = 0.0 
     for k in range(self.n): 
      total += self.do_chunk(k, xi, yi) 
     stopwatch('\nAll in single process') 
     return total 

def _do_chunk_wrapper(k_start, k_end, xi, yi): 
    return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)   

if __name__ == "__main__": 
    stopwatch() 
    n = 50 
    bd = BigData(n) 
    m = 1000*1000 
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m)) 
    stopwatch('Initialized') 
    bd.do_multi(2, xi, yi) 
    bd.do_multi(3, xi, yi) 
    bd.do_single(xi, yi) 

Erträge

Initialized: 0.15 seconds 

Pool setup: 0.06 seconds 
Jobs queued (2 processes): 0.00 seconds 

First get() done: 6.56 seconds 
83963796.0404 
Jobs done: 0.55 seconds 
.. 
Pool setup: 0.08 seconds 
Jobs queued (3 processes): 0.00 seconds 

First get() done: 5.19 seconds 
83963796.0404 
Jobs done: 1.57 seconds 
... 
All in single process: 12.13 seconds 

im Vergleich zum ursprünglichen Code:

Initialized: 0.10 seconds 
Pool setup: 0.03 seconds 
Jobs queued (2 processes): 0.00 seconds 

First get() done: 10.47 seconds 
Jobs done: 0.00 seconds 
.................................................. 
Pool setup: 0.12 seconds 
Jobs queued (3 processes): 0.00 seconds 

First get() done: 9.21 seconds 
Jobs done: 0.00 seconds 
.................................................. 
All in single process: 12.12 seconds 
+0

Das macht Sinn, danke. In meinem eigentlichen Programm ist 'BigData' 37 MB, gebeizt. Ich dachte, dass IPC schnell genug ist, um 37 MB in einem Bruchteil einer Sekunde zu bewältigen, aber der wirkliche Engpass scheint zu sein, dass "pickle.loads (pickle.dumps (bigdata))" 2,8 Sekunden auf meinem System benötigt! Jetzt kann ich den Multiprocessing-Code zum vierten Mal fortsetzen ... Ich muss sicherstellen, dass das Buchhaltungsrecht für die globale Variable, vielleicht einen Variablennamen wie bigdata_ , und stellen Sie sicher, dass alle Änderungen an Die Variable wird im übergeordneten Prozess ausgeführt. –

+0

Und der Grund für kleine Chunks ist, dass ich alle Daten, die im CPU-Cache verarbeitet werden, behalten wollte. Die Speicherbandbreite scheint jedoch ein kleinerer Faktor zu sein. –

+0

Ja, das ist eine weitere wichtige Überlegung. Oben habe ich 'ks = list (map (int, np.linspace (0, self.n, numproc + 1)))' gesetzt. Dies bedeutet, dass die Anzahl der Aufrufe von "apply_async" gleich "numproc" ist. Vielleicht möchten Sie 'ks = list (map (int, np.linspace (0, self.n, numchunks + 1)))' 'verwenden und mit anderen Werten von' numchunks' experimentieren. – unutbu

Verwandte Themen