1

Ich versuche, alle meine Jobs zwischen 16 Prozessoren gleichmäßig mit Pool zu verteilen. Was ich festgestellt habe, ist, dass zunächst 16 Prozesse entstehen. Nach ein paar Sekunden führen nur 2 Prozesse alle verbleibenden Jobs für eine kleine Anzahl von Jobs aus. Egal, wie viel ich die Last erhöhe, es scheint, dass die Anzahl der Prozesse, die daran arbeiten, sinkt. Schließlich durchlaufen nur 1 oder 2 Prozesse die verbleibenden Jobs.Multiprocessing verteilt Jobs nicht gleichmäßig - Python

Hier ist das Multiprocessing-Snippet aus meinem Code.

Gibt es trotzdem alle 16 Prozessoren zu verwenden, um die Parallelisierung zu maximieren? Vielen Dank!

Bearbeiten! So werden die Aufgaben verteilt. Zähler mit PID als Schlüssel und Anzahl der Aufgaben als Wert.

Counter({30179: 14130, 30167: 13530, 30169: 12900, 30173: 12630, 30165: 12465, 30177: 12105, 30163: 11820, 30175: 11460, 30161: 10860, 30181: 10725, 30183: 9855, 30157: 8695, 30159: 6765, 30171: 4860, 30155: 1770}) 
+0

'chunksize' macht nicht das, was Sie gerade tun - setzen Sie es auf die Anzahl der Prozesse in Ihrem Pool (dh' pool._processes'), wenn Sie die iterierbaren 'work's in gleich große Teile aufteilen wollen der Prozesse im Pool. Tho, wenn du das willst, ist die wirkliche Frage, warum brauchst du überhaupt einen 'Pool'? – zwer

+0

Danke. Dies ist mein erster Code, der Multiprocessing verwendet. Ich benutze Pool, weil der Code weniger beängstigend wirkt als viele Prozesse. Ich dachte, Pool würde sich darum kümmern. Gerade jetzt lesen auf Pool vs Prozess Gibt es einen besseren Weg? – Raja

+0

Ich werde Millionen haben, wenn nicht Milliarden von Gegenständen in "funktioniert". Also denke ich, "Pool" ist besser geeignet, als so viele "Prozesse" zu erzeugen. – Raja

Antwort

0

Ok, ich werde dies als eine Antwort erweitern.

Der springende Punkt einer multiprocessing.Pool ist für sie eine Reihe von Prozessen zu spawnen und dann die Arbeit über sie in einer First-Free-First-Tasked-Art zu verteilen. Das heißt, wenn Sie n Artikel zu verarbeiten und p Anzahl der Prozesse in Ihrem Pool haben, wählen Sie (oder p * chunksize wenn chunksize definiert ist) Anzahl der Elemente und senden Sie jedes der Elemente an einen separaten Prozess für die Verarbeitung. Sobald ein Prozess die Verarbeitung eines Elements beendet hat und effektiv freigegeben wurde, wird der Pool das nächste Element abholen, es an den freigegebenen Prozess senden und so weiter, bis weitere Elemente nicht mehr vorhanden sind. Dies gewährleistet eine optimale Nutzung Ihrer erstellten Prozesse, ohne dass Sie die Distribution selbst verwalten müssen. Dies bedeutet auch, dass multiprocessing.Pool nicht für jede Situation geeignet ist. In Ihrem Fall, basierend auf dem vorgestellten Code, möchten Sie Ihr iterable gleichmäßig über eine feste Anzahl von Prozessen aufteilen, so dass der Pool nur ein Overhead wäre - es werden keine weiteren Daten mehr verteilt, wenn ein Prozess beendet ist. Wenn Sie nur die Daten teilen möchten und jeden Brocken zu einem anderen Prozess senden ist es so einfach wie:

import multiprocessing 

if __name__ == "__main__": # always guard your multiprocessing code 
    cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process 

    works = [(p, s, hit_rates) for p, s in sampled_patterns.items()] 
    chunk_size = (len(works) + cores - 1) // cores # rough chunk size estimate 

    processes = [] # a simple list to hold our process references 
    for i in range(cores): 
     work_set = works[i*chunk_size:(i+1)*chunk_size] 
     process = multiprocessing.Process(target=get_hit_rules, args=(work_set,)) 
     process.start() 
     processes.append(process) 

    results = [process.join() for process in processes] # get the data back 

Dies wird genau das tun, was Sie zu tun versucht haben - eine cpu_count() der Prozesse initiieren und senden jeweils eine (grob gesagt, der letzte Prozess wird ein bisschen weniger Daten auf durchschnittlich.) gleichmäßig großen Stück Ihrer Daten in einer Weise, dass alle Ihre Daten auf einmal parallel verarbeitet werden.

Natürlich, wenn Ihre Daten zu groß ist, wie Sie zusätzlich im Kommentar geklärt haben diese unüberschaubaren am Ende wird und dann können Sie multiprocessing.Pool wieder zurück überschaubare Einheiten Ihrer Daten zu den erzeugten Prozessen senden zu verarbeiten in eine Reihe. Darüber hinaus ist der Aufbau der works Liste auch sinnlos - warum möchten Sie eine Liste mit Milliarden von Elementen erstellen, die Sie bereits die Daten in Ihrem sampled_patterns dict haben?

Warum nicht einzelne Artikel von Ihrem sampled_patterns dict senden, anstatt eine Zwischenliste zu erstellen, nur so können Sie es auf die multiprocessing.Pool zuordnen?Um dies zu tun, alles was Sie brauchen ist eine Art eines Iterators Slicer zu erstellen und ihn an multiprocessing.Pool.imap statt und lassen Sie den Pool, den Rest intern verwalten, so:

import multiprocessing 

def patterns_slicer(patterns, size, hit_rates): 
    pos = 0 # store our current position 
    patterns = patterns.items() # use the items iterator 
    while pos < len(patterns): 
     yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]] 
     pos += size 

if __name__ == "__main__": # always guard your multiprocessing code 
    cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process 
    pool = multiprocessing.Pool(processes=cores) 
    # lets use chunks of 100 patterns each 
    results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates)) 

Natürlich multiprocessing.Pool.imap macht eine Menge Wenn also Ihre Originaldaten zu groß sind oder Sie riesige Teile verwenden möchten, sollten Sie die Implementierung Ihres eigenen imap mit Just-in-Time-Datenabruf in Erwägung ziehen. Überprüfen Sie this answer für ein Beispiel.

+0

Vielen Dank für Ihre Mühe und Zeit! Bin dankbar. Ich konnte noch nicht mit meinem Code arbeiten. Eine Sache, die ich wahrscheinlich hätte erwähnen sollen, ist, dass ich 'Manager(). Dict()' verwende, um die Ergebnisse von jedem Prozess zu aktualisieren. Wenn ich meine Jobs als Generator übergebe, funktioniert es. Tut aber nichts, wenn ich 'iterator_slicer' passiere. – Raja

Verwandte Themen