2010-01-17 8 views
45

Ich habe Probleme mit dem Multiprocessing-Modul. Ich verwende einen Pool von Arbeitern mit seiner Kartenmethode, um Daten aus vielen Dateien zu laden, und für jeden von ihnen analysiere ich Daten mit einer benutzerdefinierten Funktion. Jedes Mal, wenn eine Datei verarbeitet wurde, möchte ich einen Zähler aktualisieren lassen, damit ich nachvollziehen kann, wie viele Dateien noch verarbeitet werden müssen. Hier ist Beispielcode:Python Multiprocessing und ein gemeinsamer Zähler

def analyze_data(args): 
    # do something 
    counter += 1 
    print counter 


if __name__ == '__main__': 

    list_of_files = os.listdir(some_directory) 

    global counter 
    counter = 0 

    p = Pool() 
    p.map(analyze_data, list_of_files) 

ich keine Lösung für diese finden können.

Antwort

48

Das Problem ist, dass die counter Variable nicht zwischen Ihren Prozessen geteilt wird: jeder separate Prozess erstellt seine eigene lokale Instanz und inkrementiert diese.

Weitere Informationen finden Sie in der Dokumentation für einige Techniken, die Sie verwenden können, um den Status zwischen Ihren Prozessen zu teilen. In Ihrem Fall möchten Sie vielleicht eine Value Instanz zwischen Ihren Mitarbeitern freigeben

Hier ist eine funktionierende Version Ihres Beispiels (mit einigen Dummy-Eingabedaten). Beachten Sie es globale Werte verwendet, die ich wirklich in der Praxis zu vermeiden versuchen würde:

from multiprocessing import Pool, Value 
from time import sleep 

counter = None 

def init(args): 
    ''' store the counter for later use ''' 
    global counter 
    counter = args 

def analyze_data(args): 
    ''' increment the global counter, do something with the input ''' 
    global counter 
    # += operation is not atomic, so we need to get a lock: 
    with counter.get_lock(): 
     counter.value += 1 
    print counter.value 
    return args * 10 

if __name__ == '__main__': 
    #inputs = os.listdir(some_directory) 

    # 
    # initialize a cross-process counter and the input lists 
    # 
    counter = Value('i', 0) 
    inputs = [1, 2, 3, 4] 

    # 
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    # 
    p = Pool(initializer = init, initargs = (counter,)) 
    i = p.map_async(analyze_data, inputs, chunksize = 1) 
    i.wait() 
    print i.get() 
+0

Große Antwort! Ich hatte das gleiche Problem in IronPython, und während multiprocessing.Value nicht verfügbar ist, können Sie etwas ähnliches mit clr.Reference und System.Threading.Interlocked tun: http://StackOverflow.com/Questions/2255461/how-to-atomical- increment-a-static-Mitglied-in-ironpython/2314858 # 2314858 –

+3

@jkp, wie würdest du es ohne die globale Variable tun? - Ich versuche eine Klasse zu benutzen, aber es ist nicht so einfach wie es scheint. Siehe http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemeth-when-pythons-multiprocessing-pool-ma – Anna

+18

Leider scheint dieses Beispiel fehlerhaft zu sein, da 'counter.value + = 1 'ist nicht atomar zwischen Prozessen, so wird der Wert falsch sein, wenn er lange genug mit ein paar Prozessen läuft –

24

Klasse Counter ohne Renn Zustand Fehler:

class Counter(object): 
    def __init__(self): 
     self.val = multiprocessing.Value('i', 0) 

    def increment(self, n=1): 
     with self.val.get_lock(): 
      self.val.value += n 

    @property 
    def value(self): 
     return self.val.value 
+0

Für ähnlichen Code, der mit 'joblib'' Parallel' (der Code in dieser Antwort funktioniert nicht mit 'joblib'), siehe https://github.com/davidheryanto/etc/blob/master/python-recipes/parallel-joblib-counter.py –

0

Schnelle Zähler Klasse ohne die eingebaute Sperre von Wert zweimal

class Counter(object): 
    def __init__(self, initval=0): 
     self.val = multiprocessing.RawValue('i', initval) 
     self.lock = multiprocessing.Lock() 

    def increment(self): 
     with self.lock: 
      self.val.value += 1 

    @property 
    def value(self): 
     return self.val.value 

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue