2016-06-08 14 views
0

Ich analysiere eine cvs-Datei Daten Zeile für Zeile. Für jede Zeile erzeuge ich eine Zeichenfolge, die die analysierten Ergebnisse speichert.Python Multiprozessing und speichern Sie Daten in der Reihenfolge

Da die Datei sehr groß ist, muss ich Multiprocessing durchführen. Aber weil ich auf die Ergebnisse über ihren Index zugreifen muss, muss ich sie in einer Reihenfolge speichern (in einer lokalen Datei).

Ein Weg, den ich versuchte und arbeitete, ist das Sperren, aber es ist immer noch sehr langsam. Wenn ich es nicht gesperrt habe, läuft es schnell, aber die Daten werden durcheinander gebracht.

Was ich tun möchte, ist, diese Ergebnisse zu einer globalen Liste zu speichern. Und ich kann in eine lokale Datei schreiben, wenn alle Subprozesse abgeschlossen sind. Hinweise, wie man es ohne Schloss macht und wie man es beschleunigt?

Der folgende sind mein Multiprozessing Teilcode:

def worker(dat,fileName,l): 
    l.acquire() 
    target = open(fileName,"a") 
    for values in dat: 
     # recursively apply different start mean2, find best solution 
     model = MixGaussian(values) 
     bf = model.findBestFit() 
     # find the value where it's equally probable belongs to two gaussians 
     m1 = bf[0] 
     m2 = bf[1] 
     d1 = bf[2] 
     d2 = bf[3] 

     # calculate x 
     k = math.log((d1+0.001)/(d2+0.001))* d1 * d2 
     a = d1 -d2 
     b = 2 * (m1*d2 - m2 * d1) 
     c = m2 * m2 * d1 - m1* m1 * d2 - k 
     delta = -1 * math.sqrt(b*b - 4 * a * c) 
     if a == 0: 
      a += 0.01 
     x = (-1 * b + delta)/(2 * a) 
     bf.append(x) 
     print bf 

     target.write(",".join(str(ele) for ele in bf)) 
     target.write("\n") 

    target.close() 
    l.release() 

if __name__ == "__main__": 
    # read from line 8000 to 8999 
    data = readFile("unc_expr.tsv",8000,9000) 
    target = open("modelstest9.csv","w") 
    target.write("mean_1,mean_2,deviantion_1,deviation_2,cross_value") 
    target.write("\n") 
    target.close() 

    numPrcs = 16 
    d = [] 
    for i in range(numPrcs-1): 
     d.append(data[i*len(data)/numPrcs:(i+1) *len(data)/numPrcs]) 
    d.append(data[(numPrcs-1)*len(data)/numPrcs:]) 

    start_time = time.time() 
    lock = Lock() 
    print("start time: %s"%start_time) 

    for i in range(numPrcs):  
     Process(target=worker,args=(d[i],"modelstest9.csv",lock)).start() 

Dank !!

+0

Der Schutz des gesamten Worker-Codes mit einer Sperre hat den gleichen Effekt, als würden Sie überhaupt keinen Multiprocessing verwenden. Sie erzwingen, dass alle Aufgaben seriell ausgeführt werden. – mata

+0

Yep ich erkannte es und danke für die Notiz. Ich habe stattdessen den Schreibprozess gesperrt, aber er schreibt die Ergebnisse des schnellsten Unterprozesses. –

Antwort

0

ThreadPoolExecuter ist ideal, um die gleiche Methode parallel mit verschiedenen Datensätzen auszuführen und die Ergebnisse von jedem Thread wieder zusammenzusetzen.

Wenn die Ergebnisse von jedem Thread zurückkommen, fügen Sie sie in eine Liste ein wie [(index1, result1), (index2, result2),...], wenn sie zurückkommen, sortieren Sie die Liste nach dem Index, wenn alle Threads abgeschlossen sind, und schreiben Sie die sortierte Liste in eine Datei.

Der Haken hier ist, dass dies alle Ihre Ergebnisse im Speicher hält, aber Sie sollten dies tun können, da Sie sie bereits in jedem Ihrer Prozesse im Speicher halten.

+0

Wenn die Prozesse CPU-gebunden sind, möchten Sie mit 'ProcessPoolExecutor' GIL-Probleme umgehen (für die Sie auch 'Multiprocessing' verwenden). – ShadowRanger

2

Ich würde vorschlagen, eine multiprocessing.Pool verwenden und ihre imap Methode, um die Arbeit zu tun. Lassen Sie die Arbeiter return Werte eingeben, anstatt sie direkt zu schreiben, wobei der Hauptprozess die gesamte E/A ausführt. imap garantiert, dass Sie die Ergebnisse in der Reihenfolge erhalten, in der die Aufgaben gesendet wurden, und da nur der Hauptprozess E/A ausführt, sind Konflikte nicht möglich.

Es ist auch eine Verbesserung, weil Sie Ihre Arbeit in feste Stücke teilen können, anstatt die Arbeit sorgfältig auf die Anzahl der Prozesse zu verteilen, die Sie starten möchten. multiprocessing.Pool erstellt standardmäßig eine Anzahl von Arbeitern, die Ihren CPU-Kernen entsprechen (Sie müssen also die Anzahl der Worker nicht manuell angeben und riskieren zu wenig, um Kerne zu verschwenden oder zu viel Zeit für Kontextwechsel zu verschwenden). Und map/imap und Unternehmen wird X Arbeitsaufgaben nahtlos unter N Arbeiter teilen, ohne sicherzustellen, dass die Anzahl der Arbeitsaufgaben gleich der Anzahl der Arbeitsaufgaben ist.

+0

Wie fügt man hier imap() Argumente hinzu? Ich verstehe imap() Methoden nicht wirklich. Danke für die Hilfe! –

+0

@AlexWang: Sie definieren die Funktion als 'def worker (allargs):' mit der ersten Zeile 'dat, fileName, l = allars', um sie zu entpacken. Dann erstellen Sie einfach einen Generator, der Drei-Tupel erzeugt, und übergeben das: 'für das Ergebnis in pool.imap (worker, generatoroftuples):' – ShadowRanger

Verwandte Themen