Dies ist eine Erweiterung meiner letzten Frage Avoiding race conditions in Python 3's multiprocessing Queues. Hoffentlich ist diese Version der Frage spezifischer.Effizientes Multiprocessing der massiven, brachialen Kraftmaximierung in Python 3
TL; DR: Warum arbeiten Worker-Prozesse in einem Multiprocessing-Modell, in dem Arbeitsprozesse aus einer Warteschlange mit multiprocessing.Queue
gespeist werden? Jeder Prozess hat seine eigene Eingabewarteschlange, so dass sie sich nicht gegenseitig für die Sperre einer gemeinsamen Warteschlange bekämpfen, aber die Warteschlangen verbringen eine Menge Zeit tatsächlich nur leer. Der Hauptprozess führt einen E/A-gebundenen Thread aus - verlangsamt dies das CPU-gebundene Füllen der Eingabewarteschlangen?
Ich versuche, das maximale Element des kartesischen Produkts von N-Sets jeweils mit M_i-Elementen (für 0 < = ich < = N) unter einer bestimmten Bedingung zu finden. Es sei daran erinnert, dass die Elemente des kartesischen Produkts Länge-N-Tupel sind, deren Elemente Elemente der N Mengen sind. Ich nenne diese Tupel "Kombinationen", um die Tatsache hervorzuheben, dass ich jede Kombination der Originalsätze durchlaufe. Eine Kombination erfüllt die Bedingung, wenn meine Funktion is_feasible
True
zurückgibt. In meinem Problem versuche ich die Kombination zu finden, deren Elemente das größte Gewicht haben: sum(element.weight for element in combination)
.
Meine Problemgröße ist groß, aber auch der Server meiner Firma. Ich versuche, den folgenden seriellen Algorithmus als einen parallelen Algorithmus neu zu schreiben.
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
Mein aktueller Multiprocessing-Ansatz ist Arbeitsprozesse zu erstellen und diese Kombinationen mit einer Eingangswarteschlange füttern. Wenn die Worker eine poison pill empfangen, platzieren sie die beste Kombination, die sie gesehen haben, in einer Ausgabewarteschlange und beenden sie. Ich fülle die Eingabewarteschlange aus dem Hauptthread des Hauptprozesses. Ein Vorteil dieser Technik ist, dass ich einen sekundären Thread aus dem Hauptprozess generieren kann, um ein Überwachungstool auszuführen (nur eine REPL, die ich verwenden kann, um zu sehen, wie viele Kombinationen bisher verarbeitet wurden und wie voll die Warteschlangen sind).
Ich hatte ursprünglich alle Arbeiter aus einer Eingabewarteschlange lesen, aber festgestellt, dass keiner von ihnen die CPU traf. Als sie feststellten, dass sie ihre ganze Zeit damit verbrachten, auf queue.get() zu warten, um sie zu entsperren, gab ich ihnen ihre eigenen Warteschlangen. Das erhöhte den Druck auf die CPU, daher dachte ich, dass die Arbeiter häufiger aktiv waren. Die Warteschlangen verbringen jedoch die meiste Zeit leer! (Ich weiß das von der Überwachungs-REPL, die ich erwähnt habe). Dies deutet darauf hin, dass die Hauptschleife, die die Warteschlangen füllt, langsam ist. Hier ist die Schleife:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
Ich vermute, der Engpass worker.in_q.put()
ist. Wie mache ich das schneller? Mein erster Instinkt war, die Arbeiter langsamer zu machen, aber das macht einfach keinen Sinn ... Ist das Problem, dass der Monitor-Thread die Schleife zu oft stoppt? Wie könnte ich erzählen?
Alternativ gibt es eine andere Möglichkeit, dies zu implementieren, die nicht so viel auf Sperren warten?
ich _think_ Ich sehe, was Sie sagen, und wenn dies der Fall, dass eine Menge Neuschreiben erfordern würde. Sie haben definitiv einen Punkt über meine ständig 'Dump' und' Laden' der Elemente (die Instanzen einer Unterklasse von 'collections.Counter' sind). Würde es helfen, wenn ich sie vorseele? Zum Beispiel vor der 'for' Schleife in' main', hat eine Zeile wie 'sets_of_pickles = map (lambda s: Karte (Deponien, s), Sätze)' – wkschwartz
, das wahrscheinlich etwas helfen würde: es auf den Beizkosten sparen würde , aber Sie würden immer noch die gebeizten Strings durch IPC immer und immer und immer und nicht nur einmal pro Arbeiter übertragen. – Dougal