2010-12-07 9 views
15

Ich möchte ein peinlich paralleles Problem im Zusammenhang mit Bayesian Inference beschleunigen. Ziel ist es, für eine Menge von Bildern x bei einer gegebenen Matrix A Koeffizienten u abzuleiten, so dass X = A * U. X hat die Dimensionen mxn, A mxp und U pxn. Für jede Spalte von X muss man die optimale entsprechende Spalte der Koeffizienten U ableiten. Am Ende wird diese Information verwendet, um A zu aktualisieren. Ich benutze m = 3000, p = 1500 und n = 100. Also, wie es ist ein lineares Modell, die Inferenz der Koeffizientenmatrix besteht aus n unabhängigen Berechnungen. Daher habe ich versucht, mit dem Multiprocessing-Modul von Python zu arbeiten, aber es gibt keine Beschleunigung.Warum gibt es keine Beschleunigung bei der Verwendung von Pythons-Multiprocessing für peinlich parallele Probleme innerhalb einer For-Schleife mit gemeinsamen numpigen Daten?

Hier ist, was ich getan habe:

Die Hauptstruktur, ohne Parallelisierung ist:

import numpy as np 
from convex import Crwlasso_cd 

S = np.empty((m, batch_size)) 

for t in xrange(start_iter, niter): 

    ## Begin Warm Start ## 
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf. 
    for ws in range(5): 
     # Initialize the coefficients 
     if ws: 
      theta = U 
     else: 
      theta = np.dot(A.T, X) 

     # Infer the Coefficients for the given data batch X of size mxn (n=batch_size) 
     # Crwlasso_cd is the function that does the inference per data sample 
     # It's basically a C-inline code 
     for k in range(batch_size): 
      U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy()) 

     # Given the inferred coefficients, update and renormalize 
     # the basis functions A 
     dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood 
     A += (eta/batch_size) * dA1 
     A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0)))) 

Implementierung von Multiprocessing:

Ich versuchte Multiprozessing zu implementieren. Ich habe eine 8-Kern-Maschine, die ich verwenden kann.

  1. Es gibt 3 For-Schleifen. Die einzige, die „parallelisierbaren“ ist der dritte, in dem die Koeffizienten abgeleitet zu sein scheint:
    • Generieren einer Queue und stapeln die Iteration-Zahlen von 0 bis batch_size-1 in die Warteschlange
    • Generieren 8 verarbeitet, und lassen sie sie durch die Queue arbeiten
  2. Teilen sie die Daten U mit multiprocessing.Array

So ersetzte ich diese dritte Schleife mit den folgenden:

Hier

ist die Klasse Wrap_mp:

class Wrap_mp(object): 
""" Wrapper around multiprocessing.Array to share an array across 
    processes. Store the array as a multiprocessing.Array, but compute with it 
as a numpy.ndarray 
""" 

    def __init__(self, arr): 
     """ Initialize a shared array from a numpy array. 

      The data is copied. 
     """ 
     self.data = ndarray_to_shmem(arr) 
     self.dtype = arr.dtype 
     self.shape = arr.shape 

    def __array__(self): 
     """ Implement the array protocole. 
     """ 
     arr = shmem_as_ndarray(self.data, dtype=self.dtype) 
     arr.shape = self.shape 
     return arr 

    def asarray(self): 
     return self.__array__() 

Und hier ist die Funktion infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X): 

    while True: 
     try: 
      index = work_queue.get(block=False) 
      x = X[:,index] 
      U = U_mp.asarray() 
      theta = np.dot(phit,x) 

      # Infer the coefficients of the column index 
      U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy()) 

     except Empty: 
      break 

Das Problem sind nun folgendes:

  1. Die Multiprocessing-Version ist nicht schneller als die Single-Thread-Version für die angegebenen Dimensionen der Daten.
  2. Die Prozess-ID steigt mit jeder Iteration. Bedeutet dies, dass ständig ein neuer Prozess generiert wird? Führt das nicht zu einem enormen Overhead? Wie kann ich das vermeiden? Gibt es die Möglichkeit, innerhalb der gesamten for-Schleife 8 verschiedene Prozesse zu erstellen und diese einfach mit den Daten zu aktualisieren?
  3. Macht die Art, wie ich die Koeffizienten U unter den Prozessen teile, die Berechnung langsamer? Gibt es einen anderen, besseren Weg, dies zu tun?
  4. Wäre ein Pool von Prozessen besser?

Ich bin wirklich dankbar für jede Art von Hilfe!Ich habe vor einem Monat begonnen, mit Python zu arbeiten, und bin jetzt ziemlich verloren.

Engin

+0

Muss die Arbeitsplanung wirklich einzeln eingereicht werden? Wäre es nicht fair, mehrere Arbeitseinheiten pro Kern im Voraus zu planen? Ich würde vermuten, dass, wenn Sie sehr wenig Verbesserung durch Prozess-Pooling sehen, dass viel Zeit in Sperrkonflikt innerhalb der Prozess-Warteschlange verbracht wird. – cjhanks

Antwort

5

Jedes Mal, wenn Sie einen Prozess erstellen Sie einen neuen Prozess. Wenn Sie dies innerhalb Ihrer for-Schleife tun, dann ja, Sie starten jedes Mal neue Prozesse durch die Schleife. Es klingt wie das, was Sie tun möchten, ist Ihre Queue und Prozesse außerhalb der Schleife zu initialisieren, und füllen Sie dann die Warteschlange innerhalb der Schleife.

Ich habe multiprocessing.Pool zuvor verwendet, und es ist nützlich, aber es bietet nicht viel über das, was Sie bereits mit einer Warteschlange implementiert haben.

+0

Vielen Dank für Ihre Antwort! Aber wie können Sie tatsächlich Prozesse außerhalb der Schleife erstellen und dann nur die Variablen aktualisieren, die sie gegeben haben, und die Ergebnisse ohne p.join() synchronisieren? Weil die Join-Funktion den Prozess schließt, oder? –

+1

p.join wartet einfach auf das Beenden des Prozesses, was es tun würde, wenn es sys.exit aufruft oder von der Funktion zurückkehrt. Sobald Sie den Prozess starten, wird er parallel zum Hauptprozess ausgeführt. Wenn die Worker-Prozesse work_queue.get() aufrufen, werden sie blockiert, bis sie einen Eintrag in der Arbeitswarteschlange haben. Wenn Sie work_queue.put() im Hauptprozess aufrufen, füttern Sie Arbeit in die Warteschlange, und Sie können das weiter tun, bis sie erledigt sind. –

+0

Entschuldigung für das Reasking, aber nur um es klar zu sagen: Ich initialisiere die Prozesse außerhalb der Hauptschleife mit processes = [Process (...) for ...] und innerhalb der Schleife, sobald ich gefüllt habe die Warteschlange zum ersten Mal, starten Sie die Prozesse nur einmal, lasse ich sie durch die Warteschlange arbeiten, bis es leer ist. Dann fülle ich die Warteschlange in der zweiten Iteration erneut. Jetzt arbeiten die Prozesse automatisch durch die Warteschlange, ohne dass ich sie erneut aktivieren muss, und sie aktualisieren die Objekte mit gemeinsam genutztem Speicher? –

3

Schließlich läuft alles auf eine Frage hinaus: Ist es möglich, Prozesse außerhalb der Haupt-for-Schleife zu starten, und für jede Iteration die aktualisierten Variablen in ihnen zu füttern, sie die Daten verarbeiten und die neu sammeln Daten aus allen Prozessen berechnet, ohne jede Iteration neue Prozesse starten zu müssen?

Verwandte Themen