2017-12-16 1 views
0

enter image description herePython Multiprocessing mit Pandas nicht alle Prozess läuft auf einmal

Ich bin ein csv in Brocken zu lesen und das Bestehen der Brocken auf einen Pool von 4 Prozessen.

Mit diesem Code mein Verständnis ist es sollte mir zeigen 4 Prozess läuft kontinuierlich. Aber im Screenshot von htop unten ist es immer 2 running. Einer ist htop Befehl es selbst. Es bedeutet, dass nur 1 Python-Prozess zu der Zeit ausgeführt wird.

enter image description here Von der Speichernutzung Es ist 12 GB, die ich denke nur möglich sein wird, wenn die vier Stücke im Speicher 1 chunk 2gb

fast vorgesehen geladen ist, wie ich auf einmal für Prozessoren verwenden kann .

+0

Ich habe gerade einen dummen Fehler in meinem Beispiel unten behoben. Konntest du es testen? – hansaplast

Antwort

0

Das Problem ist, dass Sie falsch verstanden haben, wie die Karte funktioniert. Von the doc:

map(func, iterable[, chunksize]) Diese Methode hackt die iterable in eine Reihe von Stücken, die es zum Prozesspool als getrennte Aufgaben einreicht. Die ungefähre Größe dieser Chunks kann angegeben werden, indem chunksize auf eine positive ganze Zahl gesetzt wird.

Als iterierbar geben Sie eine Liste mit nur einem Element an: das Tupel (df, ...). Aber Sie müssten eine iterable mit vielen Elementen bereitstellen. Um diese Arbeit zu machen, würden Sie zu benötigen die Liste vorbereiten erste und nur dann an die Prozesse senden (Tipp: Sie können einfach Pool() schreiben und lassen Python die Anzahl der Kerne selbst herausfinden)

pool = Pool() 
chunk_index = 1 
list = [] 
for df in pd.read_csv(downloaded_file, 
     chunksize=chunksize, 
     compression='gzip', 
     skipinitialspace=True, 
     encoding='utf-8'): 
    output_file_name = output_path + 
     merchant['output_file_format'].format(file_index, chunk_index) 
    list.append((df, transformer, output_file_name)]) 
    chunk_index += 1 
pool.map(wrapper_process, list) 

Aber jetzt haben Sie das Problem, dass Sie die vollen CSV-Daten in Speicher halten müssen, die könnte in Ordnung sein, ist aber in der Regel nicht. Zu kommen, um dieses Problem Sie können wechseln, um eine Warteschlange mit: Sie würden

  • aufbauen eine leere Warteschlange
  • die Prozesse beginnen und ihnen sagen, Elemente aus der Warteschlange zu bekommen (was am Anfang noch leer ist)
  • die Warteschlange mit dem Hauptprozess füttern (und vielleicht auch überprüfen, dass die Warteschlange nicht zu lange wird immer so den Speicherverbrauch nicht in das Dach geht)
  • ein in die Warteschlange STOP Element setzen, so dass die Prozesse selbst beenden

Es gibt ein gutes Beispiel in the official doc (look at the last example on the page), das erklärt, dass Sie sich dem annähern würden.

Ein letztes Wort: Sind Sie sicher, dass Ihre Operation CPU-gebunden ist? Machst du eine Menge Verarbeitung in wrapper_process (und möglicherweise auch transformer)? Weil, wenn Sie nur die CSV in separaten Dateien ohne viel Verarbeitung teilen Ihr Programm ist IO-gebunden und nicht CPU gebunden und dann würde die Multiprozessing keinen Sinn machen.

+0

Dies funktioniert für kleine Daten. Mein ein Stück ist 2 GB und es ist nicht realistisch, 24 GB Daten in meinem Speicher zu halten. Irgendwelche Arten, die dies klar sagen, wie Karte funktioniert, so dass ich akzeptiere, ist eine korrekte Lösung. Ich habe es mit 'apply_aysnc' geschafft und gezwungen, Resposne zu bekommen, sobald mein Poollimit erreicht ist. Vielen Dank –

Verwandte Themen