2012-05-19 10 views
8

Dies ist eine Erweiterung meiner letzten Frage Avoiding race conditions in Python 3's multiprocessing Queues. Hoffentlich ist diese Version der Frage spezifischer.Effizientes Multiprocessing der massiven, brachialen Kraftmaximierung in Python 3

TL; DR: Warum arbeiten Worker-Prozesse in einem Multiprocessing-Modell, in dem Arbeitsprozesse aus einer Warteschlange mit multiprocessing.Queue gespeist werden? Jeder Prozess hat seine eigene Eingabewarteschlange, so dass sie sich nicht gegenseitig für die Sperre einer gemeinsamen Warteschlange bekämpfen, aber die Warteschlangen verbringen eine Menge Zeit tatsächlich nur leer. Der Hauptprozess führt einen E/A-gebundenen Thread aus - verlangsamt dies das CPU-gebundene Füllen der Eingabewarteschlangen?

Ich versuche, das maximale Element des kartesischen Produkts von N-Sets jeweils mit M_i-Elementen (für 0 < = ich < = N) unter einer bestimmten Bedingung zu finden. Es sei daran erinnert, dass die Elemente des kartesischen Produkts Länge-N-Tupel sind, deren Elemente Elemente der N Mengen sind. Ich nenne diese Tupel "Kombinationen", um die Tatsache hervorzuheben, dass ich jede Kombination der Originalsätze durchlaufe. Eine Kombination erfüllt die Bedingung, wenn meine Funktion is_feasibleTrue zurückgibt. In meinem Problem versuche ich die Kombination zu finden, deren Elemente das größte Gewicht haben: sum(element.weight for element in combination).

Meine Problemgröße ist groß, aber auch der Server meiner Firma. Ich versuche, den folgenden seriellen Algorithmus als einen parallelen Algorithmus neu zu schreiben.

from operator import itemgetter 
from itertools import product # Cartesian product function from the std lib 
def optimize(sets): 
    """Return the largest (total-weight, combination) tuple from all 
    possible combinations of the elements in the several sets, subject 
    to the constraint that is_feasible(combo) returns True.""" 
    return max(
       map(
        lambda combination: (
         sum(element.weight for element in combination), 
         combination 
        ), 
        filter(
         is_feasible, # Returns True if combo meets constraint 
         product(*sets) 
        ) 
       ), 
       key=itemgetter(0) # Only maximize based on sum of weight 
      ) 

Mein aktueller Multiprocessing-Ansatz ist Arbeitsprozesse zu erstellen und diese Kombinationen mit einer Eingangswarteschlange füttern. Wenn die Worker eine poison pill empfangen, platzieren sie die beste Kombination, die sie gesehen haben, in einer Ausgabewarteschlange und beenden sie. Ich fülle die Eingabewarteschlange aus dem Hauptthread des Hauptprozesses. Ein Vorteil dieser Technik ist, dass ich einen sekundären Thread aus dem Hauptprozess generieren kann, um ein Überwachungstool auszuführen (nur eine REPL, die ich verwenden kann, um zu sehen, wie viele Kombinationen bisher verarbeitet wurden und wie voll die Warteschlangen sind).

Ich hatte ursprünglich alle Arbeiter aus einer Eingabewarteschlange lesen, aber festgestellt, dass keiner von ihnen die CPU traf. Als sie feststellten, dass sie ihre ganze Zeit damit verbrachten, auf queue.get() zu warten, um sie zu entsperren, gab ich ihnen ihre eigenen Warteschlangen. Das erhöhte den Druck auf die CPU, daher dachte ich, dass die Arbeiter häufiger aktiv waren. Die Warteschlangen verbringen jedoch die meiste Zeit leer! (Ich weiß das von der Überwachungs-REPL, die ich erwähnt habe). Dies deutet darauf hin, dass die Hauptschleife, die die Warteschlangen füllt, langsam ist. Hier ist die Schleife:

from itertools import cycle 
main(): 
    # (Create workers, each with its own input queue) 
    # Cycle through each worker's queue and add a combination to that queue 
    for combo, worker in zip(product(*sets), cycle(workers)): 
     worker.in_q.put(combo) 
    # (Collect results and return) 

Ich vermute, der Engpass worker.in_q.put() ist. Wie mache ich das schneller? Mein erster Instinkt war, die Arbeiter langsamer zu machen, aber das macht einfach keinen Sinn ... Ist das Problem, dass der Monitor-Thread die Schleife zu oft stoppt? Wie könnte ich erzählen?

Alternativ gibt es eine andere Möglichkeit, dies zu implementieren, die nicht so viel auf Sperren warten?

Antwort

4

Wie sehen Ihre Elemente aus? Es könnte sein, dass das Beizen, um sie in die Warteschlange zu stellen, langsam ist, was natürlich ein Flaschenhals wäre. Beachten Sie, dass jedes Element unabhängig und immer wieder neu gebeizt wird.

Wenn dies der Fall ist, könnte dieser Ansatz helfen:

  • einen Satz mit Kardinalität Wählen Sie> = die Anzahl der Arbeiter. Idealerweise wäre es viel mehr als die Anzahl der Arbeiter.Nennen Sie diese Menge A und weisen Sie jedem Arbeiter ungefähr gleiche Teilmengen von A zu. Übertragen Sie diese Teilmenge an jeden Worker.
  • den vollständigen Inhalt aller Mengen andere als A zu jedem des Arbeiter (wahrscheinlich durch pickle.dumps einmal und dann zu jedem Arbeiter die gleiche Zeichenfolge zu übertragen, oder möglicherweise durch gemeinsam genutzten Speicher oder was auch immer) verteilen.
  • Dann hat jeder Arbeitnehmer die volle Information er seine Teilmenge tun muss. Es kann auf seinem fröhlichen Weg über product(my_A_subset, *other_sets) (möglicherweise anders bestellt) beginnen, für irgendeine Art von Stopsignal zwischen jedem Auftrag abfragend (oder alle drei Jobs oder was auch immer). Dies muss nicht durch eine Warteschlange geschehen, ein Ein-Bit-Wert für den gemeinsamen Speicher funktioniert einwandfrei.
+0

ich _think_ Ich sehe, was Sie sagen, und wenn dies der Fall, dass eine Menge Neuschreiben erfordern würde. Sie haben definitiv einen Punkt über meine ständig 'Dump' und' Laden' der Elemente (die Instanzen einer Unterklasse von 'collections.Counter' sind). Würde es helfen, wenn ich sie vorseele? Zum Beispiel vor der 'for' Schleife in' main', hat eine Zeile wie 'sets_of_pickles = map (lambda s: Karte (Deponien, s), Sätze)' – wkschwartz

+1

, das wahrscheinlich etwas helfen würde: es auf den Beizkosten sparen würde , aber Sie würden immer noch die gebeizten Strings durch IPC immer und immer und immer und nicht nur einmal pro Arbeiter übertragen. – Dougal