Ich analysiere eine cvs-Datei Daten Zeile für Zeile. Für jede Zeile erzeuge ich eine Zeichenfolge, die die analysierten Ergebnisse speichert.Python Multiprozessing und speichern Sie Daten in der Reihenfolge
Da die Datei sehr groß ist, muss ich Multiprocessing durchführen. Aber weil ich auf die Ergebnisse über ihren Index zugreifen muss, muss ich sie in einer Reihenfolge speichern (in einer lokalen Datei).
Ein Weg, den ich versuchte und arbeitete, ist das Sperren, aber es ist immer noch sehr langsam. Wenn ich es nicht gesperrt habe, läuft es schnell, aber die Daten werden durcheinander gebracht.
Was ich tun möchte, ist, diese Ergebnisse zu einer globalen Liste zu speichern. Und ich kann in eine lokale Datei schreiben, wenn alle Subprozesse abgeschlossen sind. Hinweise, wie man es ohne Schloss macht und wie man es beschleunigt?
Der folgende sind mein Multiprozessing Teilcode:
def worker(dat,fileName,l):
l.acquire()
target = open(fileName,"a")
for values in dat:
# recursively apply different start mean2, find best solution
model = MixGaussian(values)
bf = model.findBestFit()
# find the value where it's equally probable belongs to two gaussians
m1 = bf[0]
m2 = bf[1]
d1 = bf[2]
d2 = bf[3]
# calculate x
k = math.log((d1+0.001)/(d2+0.001))* d1 * d2
a = d1 -d2
b = 2 * (m1*d2 - m2 * d1)
c = m2 * m2 * d1 - m1* m1 * d2 - k
delta = -1 * math.sqrt(b*b - 4 * a * c)
if a == 0:
a += 0.01
x = (-1 * b + delta)/(2 * a)
bf.append(x)
print bf
target.write(",".join(str(ele) for ele in bf))
target.write("\n")
target.close()
l.release()
if __name__ == "__main__":
# read from line 8000 to 8999
data = readFile("unc_expr.tsv",8000,9000)
target = open("modelstest9.csv","w")
target.write("mean_1,mean_2,deviantion_1,deviation_2,cross_value")
target.write("\n")
target.close()
numPrcs = 16
d = []
for i in range(numPrcs-1):
d.append(data[i*len(data)/numPrcs:(i+1) *len(data)/numPrcs])
d.append(data[(numPrcs-1)*len(data)/numPrcs:])
start_time = time.time()
lock = Lock()
print("start time: %s"%start_time)
for i in range(numPrcs):
Process(target=worker,args=(d[i],"modelstest9.csv",lock)).start()
Dank !!
Der Schutz des gesamten Worker-Codes mit einer Sperre hat den gleichen Effekt, als würden Sie überhaupt keinen Multiprocessing verwenden. Sie erzwingen, dass alle Aufgaben seriell ausgeführt werden. – mata
Yep ich erkannte es und danke für die Notiz. Ich habe stattdessen den Schreibprozess gesperrt, aber er schreibt die Ergebnisse des schnellsten Unterprozesses. –