Ich habe mehrere Stunden auf verschiedene Versuche, meine Zahlenverarbeitung zu parallelisieren, aber es wird nur langsamer, wenn ich dies tue. Leider verschwindet das Problem, wenn ich versuche, es auf das unten stehende Beispiel zu reduzieren und ich möchte hier nicht wirklich das gesamte Programm posten. Die Frage ist also: Welche Fallstricke sollte ich bei dieser Art von Programm vermeiden?Minimieren Sie Overhead in Python Multiprocessing.Pool mit numpy/scipy
. (Anmerkung: Follow-up nach der Antwort des Unutbu ist an der Unterseite ist)
Hier sind die Umstände:
- Es geht um ein Modul, das eine Klasse
BigData
mit vielen internen Daten definiert. Im Beispiel gibt es eine Listeff
der Interpolationsfunktionen; in dem tatsächlichen Programm gibt es mehr, z. B.ffA[k]
,ffB[k]
,ffC[k]
. - Die Berechnung würde als "peinlich parallel" klassifiziert: Die Arbeit kann auf kleineren Datenblöcken gleichzeitig durchgeführt werden. Im Beispiel ist das
do_chunk()
. - Der im Beispiel gezeigte Ansatz würde in meinem eigentlichen Programm zu der schlechtesten Leistung führen: ungefähr 1 Sekunde pro Block (zusätzlich zu 0,1 Sekunden der tatsächlichen Berechnungszeit in einem einzigen Thread). Also, für n = 50 würde
do_single()
in 5 Sekunden laufen unddo_multi()
würde in 55 Sekunden laufen. - Ich habe auch versucht, die Arbeit zu spalten, indem die
xi
undyi
Arrays in zusammenhängenden Blöcken Schneiden und allek
Werte in jeder Chunk iterieren. Das hat ein bisschen besser funktioniert. Jetzt gab es keinen Unterschied in der gesamten Ausführungszeit, ob ich 1, 2, 3 oder 4 Threads verwendete. Aber natürlich möchte ich eine tatsächliche Beschleunigung sehen! - Dies kann verwandt sein: Multiprocessing.Pool makes Numpy matrix multiplication slower. An anderer Stelle im Programm verwendete ich jedoch einen Multiprocessing-Pool für Berechnungen, die sehr viel isolierter waren: eine Funktion (nicht an eine Klasse gebunden), die ungefähr so aussieht wie
def do_chunk(array1, array2, array3)
und Berechnungen nur für diesen Array durchführt. Dort gab es einen deutlichen Geschwindigkeitsschub. - Die CPU-Auslastung wird wie erwartet mit der Anzahl der parallelen Prozesse skaliert (300% CPU-Auslastung für drei Threads).
#!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async(_do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
Der Ausgang:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
Timings sind auf einem Intel Core i3-3227 CPU mit 2 Kerne, 4 Threads, 64-Bit-Linux läuft. Für das eigentliche Programm war die Multi-Processing-Version (Pool-Mechanismus, selbst wenn nur ein Kern verwendet wurde) um Faktor 10 langsamer als die Single-Process-Version.
Follow-up
Unutbu Antwort hat mich auf dem richtigen Weg. Im aktuellen Programm wurde self
in ein 37 bis 140 MB-Objekt gebeizt, das an die Worker-Prozesse übergeben werden musste. Schlimmer noch, das Python-Beizen ist sehr langsam; Das Beizen selbst dauerte ein paar Sekunden, was für jeden Arbeitsteil passierte, der an die Arbeitsprozesse weitergegeben wurde. Anders als das Beizen und Übergeben von großen Datenobjekten ist der Overhead von apply_async
in Linux sehr klein; Für eine kleine Funktion (Hinzufügen einiger ganzzahliger Argumente) dauert es nur 0,2 ms pro 0 Paar. So ist die Aufteilung der Arbeit in sehr kleinen Brocken kein Problem für sich. Also übertrage ich alle großen Array-Argumente als Indizes zu globalen Variablen.Ich halte die Chunk-Größe klein für den Zweck der CPU-Cache-Optimierung.
Die globalen Variablen werden in einem globalen dict
gespeichert; Die Einträge werden sofort im übergeordneten Prozess gelöscht, nachdem der Worker-Pool eingerichtet wurde. Nur die Schlüssel zu dict
werden an den Arbeitsprozess übertragen. Die einzigen großen Daten für das Beizen/IPC sind die neuen Daten, die von den Mitarbeitern erstellt werden.
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async(_do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]))
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
Hier werden die Ergebnisse einer Geschwindigkeitstest sind (wiederum 2 Kerne, 4 threads), die Anzahl der Arbeitsprozesse unterschiedlicher und die Menge an Speicher in den Chunks (total Bytes des xi
, yi
, zi
Array-Scheiben). Die Zahlen sind in "Millionen Ergebniswerte pro Sekunde", aber das ist für den Vergleich nicht so wichtig. Die Zeile für "1 Prozess" ist ein direkter Aufruf an do_chunk
mit den vollen Eingabedaten, ohne irgendwelche Unterprozesse.
Der Einfluss der Datengröße im Speicher ist ziemlich signifikant. Die CPU verfügt über 3 MB gemeinsamen L3-Cache sowie 256 KB L2-Cache pro Kern. Beachten Sie, dass die Berechnung auch Zugriff auf mehrere MB interner Daten des Objekts BigData
benötigt. Was wir daraus lernen, ist also, dass es sinnvoll ist, diese Art von Geschwindigkeitstest durchzuführen. Für dieses Programm sind 2 Prozesse am schnellsten, gefolgt von 4 und 3 am langsamsten.
Abgesehen: Haben Sie sich in [dask] (http: //dask.pydata .org/de/letzte /)? Es kann viele Ihrer Multiprocessing-Aufgaben vereinfachen. –
@ajcr Noch nicht. Aber jetzt möchte ich die Anzahl der externen Abhängigkeiten minimieren, da ich diese möglicherweise auf einem Server ausführen muss, auf dem ich keine Administratorrechte habe, und ihn mit anderen Leuten teilen kann, die die gleiche Einschränkung haben. –
Auch das macht mir Angst: "Dask-Arrays implementieren eine ** Teilmenge ** der NumPy-Schnittstelle auf großen Arrays". Das klingt nach einer Menge potentieller Arbeit, um mit existierendem Code zu interagieren. –