Ich möchte ein peinlich paralleles Problem im Zusammenhang mit Bayesian Inference beschleunigen. Ziel ist es, für eine Menge von Bildern x bei einer gegebenen Matrix A Koeffizienten u abzuleiten, so dass X = A * U. X hat die Dimensionen mxn, A mxp und U pxn. Für jede Spalte von X muss man die optimale entsprechende Spalte der Koeffizienten U ableiten. Am Ende wird diese Information verwendet, um A zu aktualisieren. Ich benutze m = 3000, p = 1500 und n = 100. Also, wie es ist ein lineares Modell, die Inferenz der Koeffizientenmatrix besteht aus n unabhängigen Berechnungen. Daher habe ich versucht, mit dem Multiprocessing-Modul von Python zu arbeiten, aber es gibt keine Beschleunigung.Warum gibt es keine Beschleunigung bei der Verwendung von Pythons-Multiprocessing für peinlich parallele Probleme innerhalb einer For-Schleife mit gemeinsamen numpigen Daten?
Hier ist, was ich getan habe:
Die Hauptstruktur, ohne Parallelisierung ist:
import numpy as np
from convex import Crwlasso_cd
S = np.empty((m, batch_size))
for t in xrange(start_iter, niter):
## Begin Warm Start ##
# Take 5 gradient steps w/ this batch using last coef. to warm start inf.
for ws in range(5):
# Initialize the coefficients
if ws:
theta = U
else:
theta = np.dot(A.T, X)
# Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
# Crwlasso_cd is the function that does the inference per data sample
# It's basically a C-inline code
for k in range(batch_size):
U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())
# Given the inferred coefficients, update and renormalize
# the basis functions A
dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
A += (eta/batch_size) * dA1
A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))
Implementierung von Multiprocessing:
Ich versuchte Multiprozessing zu implementieren. Ich habe eine 8-Kern-Maschine, die ich verwenden kann.
- Es gibt 3 For-Schleifen. Die einzige, die „parallelisierbaren“ ist der dritte, in dem die Koeffizienten abgeleitet zu sein scheint:
- Generieren einer Queue und stapeln die Iteration-Zahlen von 0 bis batch_size-1 in die Warteschlange
- Generieren 8 verarbeitet, und lassen sie sie durch die Queue arbeiten
- Teilen sie die Daten U mit multiprocessing.Array
So ersetzte ich diese dritte Schleife mit den folgenden:
Hierist die Klasse Wrap_mp:
class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""
def __init__(self, arr):
""" Initialize a shared array from a numpy array.
The data is copied.
"""
self.data = ndarray_to_shmem(arr)
self.dtype = arr.dtype
self.shape = arr.shape
def __array__(self):
""" Implement the array protocole.
"""
arr = shmem_as_ndarray(self.data, dtype=self.dtype)
arr.shape = self.shape
return arr
def asarray(self):
return self.__array__()
Und hier ist die Funktion infer_coefficients_mp:
def infer_feature_coefficients_mp(work_queue,U_mp,A,X):
while True:
try:
index = work_queue.get(block=False)
x = X[:,index]
U = U_mp.asarray()
theta = np.dot(phit,x)
# Infer the coefficients of the column index
U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())
except Empty:
break
Das Problem sind nun folgendes:
- Die Multiprocessing-Version ist nicht schneller als die Single-Thread-Version für die angegebenen Dimensionen der Daten.
- Die Prozess-ID steigt mit jeder Iteration. Bedeutet dies, dass ständig ein neuer Prozess generiert wird? Führt das nicht zu einem enormen Overhead? Wie kann ich das vermeiden? Gibt es die Möglichkeit, innerhalb der gesamten for-Schleife 8 verschiedene Prozesse zu erstellen und diese einfach mit den Daten zu aktualisieren?
- Macht die Art, wie ich die Koeffizienten U unter den Prozessen teile, die Berechnung langsamer? Gibt es einen anderen, besseren Weg, dies zu tun?
- Wäre ein Pool von Prozessen besser?
Ich bin wirklich dankbar für jede Art von Hilfe!Ich habe vor einem Monat begonnen, mit Python zu arbeiten, und bin jetzt ziemlich verloren.
Engin
Muss die Arbeitsplanung wirklich einzeln eingereicht werden? Wäre es nicht fair, mehrere Arbeitseinheiten pro Kern im Voraus zu planen? Ich würde vermuten, dass, wenn Sie sehr wenig Verbesserung durch Prozess-Pooling sehen, dass viel Zeit in Sperrkonflikt innerhalb der Prozess-Warteschlange verbracht wird. – cjhanks