Das Problem ist, dass Sie falsch verstanden haben, wie die Karte funktioniert. Von the doc:
map(func, iterable[, chunksize])
Diese Methode hackt die iterable in eine Reihe von Stücken, die es zum Prozesspool als getrennte Aufgaben einreicht. Die ungefähre Größe dieser Chunks kann angegeben werden, indem chunksize auf eine positive ganze Zahl gesetzt wird.
Als iterierbar geben Sie eine Liste mit nur einem Element an: das Tupel (df, ...)
. Aber Sie müssten eine iterable mit vielen Elementen bereitstellen. Um diese Arbeit zu machen, würden Sie zu benötigen die Liste vorbereiten erste und nur dann an die Prozesse senden (Tipp: Sie können einfach Pool()
schreiben und lassen Python die Anzahl der Kerne selbst herausfinden)
pool = Pool()
chunk_index = 1
list = []
for df in pd.read_csv(downloaded_file,
chunksize=chunksize,
compression='gzip',
skipinitialspace=True,
encoding='utf-8'):
output_file_name = output_path +
merchant['output_file_format'].format(file_index, chunk_index)
list.append((df, transformer, output_file_name)])
chunk_index += 1
pool.map(wrapper_process, list)
Aber jetzt haben Sie das Problem, dass Sie die vollen CSV-Daten in Speicher halten müssen, die könnte in Ordnung sein, ist aber in der Regel nicht. Zu kommen, um dieses Problem Sie können wechseln, um eine Warteschlange mit: Sie würden
- aufbauen eine leere Warteschlange
- die Prozesse beginnen und ihnen sagen, Elemente aus der Warteschlange zu bekommen (was am Anfang noch leer ist)
- die Warteschlange mit dem Hauptprozess füttern (und vielleicht auch überprüfen, dass die Warteschlange nicht zu lange wird immer so den Speicherverbrauch nicht in das Dach geht)
- ein in die Warteschlange
STOP
Element setzen, so dass die Prozesse selbst beenden
Es gibt ein gutes Beispiel in the official doc (look at the last example on the page), das erklärt, dass Sie sich dem annähern würden.
Ein letztes Wort: Sind Sie sicher, dass Ihre Operation CPU-gebunden ist? Machst du eine Menge Verarbeitung in wrapper_process
(und möglicherweise auch transformer
)? Weil, wenn Sie nur die CSV in separaten Dateien ohne viel Verarbeitung teilen Ihr Programm ist IO-gebunden und nicht CPU gebunden und dann würde die Multiprozessing keinen Sinn machen.
Ich habe gerade einen dummen Fehler in meinem Beispiel unten behoben. Konntest du es testen? – hansaplast