Ok, ich werde dies als eine Antwort erweitern.
Der springende Punkt einer multiprocessing.Pool
ist für sie eine Reihe von Prozessen zu spawnen und dann die Arbeit über sie in einer First-Free-First-Tasked-Art zu verteilen. Das heißt, wenn Sie n
Artikel zu verarbeiten und p
Anzahl der Prozesse in Ihrem Pool haben, wählen Sie (oder p * chunksize
wenn chunksize
definiert ist) Anzahl der Elemente und senden Sie jedes der Elemente an einen separaten Prozess für die Verarbeitung. Sobald ein Prozess die Verarbeitung eines Elements beendet hat und effektiv freigegeben wurde, wird der Pool das nächste Element abholen, es an den freigegebenen Prozess senden und so weiter, bis weitere Elemente nicht mehr vorhanden sind. Dies gewährleistet eine optimale Nutzung Ihrer erstellten Prozesse, ohne dass Sie die Distribution selbst verwalten müssen. Dies bedeutet auch, dass multiprocessing.Pool
nicht für jede Situation geeignet ist. In Ihrem Fall, basierend auf dem vorgestellten Code, möchten Sie Ihr iterable gleichmäßig über eine feste Anzahl von Prozessen aufteilen, so dass der Pool nur ein Overhead wäre - es werden keine weiteren Daten mehr verteilt, wenn ein Prozess beendet ist. Wenn Sie nur die Daten teilen möchten und jeden Brocken zu einem anderen Prozess senden ist es so einfach wie:
import multiprocessing
if __name__ == "__main__": # always guard your multiprocessing code
cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process
works = [(p, s, hit_rates) for p, s in sampled_patterns.items()]
chunk_size = (len(works) + cores - 1) // cores # rough chunk size estimate
processes = [] # a simple list to hold our process references
for i in range(cores):
work_set = works[i*chunk_size:(i+1)*chunk_size]
process = multiprocessing.Process(target=get_hit_rules, args=(work_set,))
process.start()
processes.append(process)
results = [process.join() for process in processes] # get the data back
Dies wird genau das tun, was Sie zu tun versucht haben - eine cpu_count()
der Prozesse initiieren und senden jeweils eine (grob gesagt, der letzte Prozess wird ein bisschen weniger Daten auf durchschnittlich.) gleichmäßig großen Stück Ihrer Daten in einer Weise, dass alle Ihre Daten auf einmal parallel verarbeitet werden.
Natürlich, wenn Ihre Daten zu groß ist, wie Sie zusätzlich im Kommentar geklärt haben diese unüberschaubaren am Ende wird und dann können Sie multiprocessing.Pool
wieder zurück überschaubare Einheiten Ihrer Daten zu den erzeugten Prozessen senden zu verarbeiten in eine Reihe. Darüber hinaus ist der Aufbau der works
Liste auch sinnlos - warum möchten Sie eine Liste mit Milliarden von Elementen erstellen, die Sie bereits die Daten in Ihrem sampled_patterns
dict haben?
Warum nicht einzelne Artikel von Ihrem sampled_patterns
dict senden, anstatt eine Zwischenliste zu erstellen, nur so können Sie es auf die multiprocessing.Pool
zuordnen?Um dies zu tun, alles was Sie brauchen ist eine Art eines Iterators Slicer zu erstellen und ihn an multiprocessing.Pool.imap
statt und lassen Sie den Pool, den Rest intern verwalten, so:
import multiprocessing
def patterns_slicer(patterns, size, hit_rates):
pos = 0 # store our current position
patterns = patterns.items() # use the items iterator
while pos < len(patterns):
yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]]
pos += size
if __name__ == "__main__": # always guard your multiprocessing code
cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process
pool = multiprocessing.Pool(processes=cores)
# lets use chunks of 100 patterns each
results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates))
Natürlich multiprocessing.Pool.imap
macht eine Menge Wenn also Ihre Originaldaten zu groß sind oder Sie riesige Teile verwenden möchten, sollten Sie die Implementierung Ihres eigenen imap
mit Just-in-Time-Datenabruf in Erwägung ziehen. Überprüfen Sie this answer für ein Beispiel.
'chunksize' macht nicht das, was Sie gerade tun - setzen Sie es auf die Anzahl der Prozesse in Ihrem Pool (dh' pool._processes'), wenn Sie die iterierbaren 'work's in gleich große Teile aufteilen wollen der Prozesse im Pool. Tho, wenn du das willst, ist die wirkliche Frage, warum brauchst du überhaupt einen 'Pool'? – zwer
Danke. Dies ist mein erster Code, der Multiprocessing verwendet. Ich benutze Pool, weil der Code weniger beängstigend wirkt als viele Prozesse. Ich dachte, Pool würde sich darum kümmern. Gerade jetzt lesen auf Pool vs Prozess Gibt es einen besseren Weg? – Raja
Ich werde Millionen haben, wenn nicht Milliarden von Gegenständen in "funktioniert". Also denke ich, "Pool" ist besser geeignet, als so viele "Prozesse" zu erzeugen. – Raja