2016-04-18 7 views
2

(python2.7)Gemeinsamer Pool Karte zwischen Prozessen mit objektorientierten Python

Ich versuche, eine Art von Scanner zu tun, die durch CFG Knoten zu gehen hat, und Split in verschiedenen Prozessen auf Verzweigung für den Parallelismus Zweck.

Der Scanner wird durch ein Objekt der Klasse Scanner repräsentiert. Diese Klasse hat eine Methode Traverse, die durch die genannte Grafik geht und bei Bedarf aufteilt.

hier, wie es aussieht:

class Scanner(object): 
    def __init__(self, atrb1, ...): 
     self.attribute1 = atrb1 
     self.process_pool = Pool(processes=4) 
    def traverse(self, ...): 
     [...] 
     if branch: 
      self.process_pool.map(my_func, todo_list). 

Mein Problem ist folgendes: Wie erstelle ich eine Instanz von multiprocessing.Pool, ist, dass zwischen allen meinen Prozessen geteilt? Ich möchte, dass es geteilt wird, denn da ein Pfad wieder aufgeteilt werden kann, möchte ich nicht mit einer Art Gabelbombe enden, und mit demselben Pool kann ich die Anzahl der gleichzeitig ablaufenden Prozesse begrenzen.

Der obige Code funktioniert nicht, da Pool nicht gebeizt werden kann. In der Folge habe ich versucht, dass:

class Scanner(object): 
    def __getstate__(self): 
     self_dict = self.__dict__.copy() 
     def self_dict['process_pool'] 
     return self_dict 
    [...] 

Aber offensichtlich, kommt es self.process_pool nicht definiert in den erstellten Prozesse zu haben.

Dann habe ich versucht, einen Pool als Modul-Attribut zu erstellen:

process_pool = Pool(processes=4) 

def my_func(x): 
    [...] 

class Scanner(object): 
    def __init__(self, atrb1, ...): 
     self.attribute1 = atrb1 
    def traverse(self, ...): 
     [...] 
     if branch: 
      process_pool.map(my_func, todo_list) 

Es funktioniert nicht, und dies erklärt, warum answer. Aber hier kommt die Sache, wo immer ich meinen Pool erstelle, fehlt etwas. Wenn ich diesen Pool am Ende meiner Datei erstelle, sieht er self.attribute1 nicht, genauso wie er answer nicht gesehen hat und mit einem AttributeError fehlschlägt.

Ich versuche noch nicht einmal, es zu teilen, und ich bin schon fest mit Multiprocessing Art und Weise zu tun.

Ich weiß nicht, ob ich nicht das ganze Ding richtig gedacht habe, aber ich kann nicht glauben, dass es so kompliziert ist, etwas so einfaches zu handhaben wie "einen Arbeiterpool zu haben und ihnen Aufgaben zu geben".

Danke,

EDIT: ich mein erstes Problem (Attribute) gelöst, hatte meine Klasse einen Rückruf als ihr Attribut, und dieser Rückruf wurde in der Hauptskriptdatei definiert, nach dem Import der Scanner-Modul ... Aber die Nebenläufigkeit und "Do not fork bomb" -Ding ist immer noch ein Problem.

Antwort

0

Was Sie tun möchten, kann nicht sicher durchgeführt werden. Denken Sie darüber nach, ob Sie eine gemeinsame gemeinsame über Parent-und Worker-Prozesse mit sagen wir zwei Worker-Prozesse hatten. Das übergeordnete Element führt eine map aus, die versucht, zwei Aufgaben auszuführen, und jede Aufgabe muss map zwei weitere Aufgaben. Die beiden übergeteilten Aufgaben werden an jeden Worker und die übergeordneten Blöcke übergeben. Jeder Worker sendet zwei weitere Aufgaben an den gemeinsam genutzten Pool und blockiert sie, damit sie ausgeführt werden können. Aber jetzt sind alle Arbeiter beschäftigt und warten darauf, dass ein Arbeiter frei wird; du bist festgefahren.

Eine sicherere Methode wäre, dass die Mitarbeiter genügend Informationen zurückgeben, um zusätzliche Aufgaben im übergeordneten System zu verteilen.Dann könnten Sie so etwas wie:

class MoreWork(object): 
    def __init__(self, func, *args): 
     self.func = func 
     self.args = args 

pool = multiprocessing.Pool() 
try: 
    base_task = somefunc, someargs 
    outstanding = collections.deque([pool.apply_async(*base_task)]) 
    while outstanding: 
     result = outstanding.popleft().get() 
     if isinstance(result, MoreWork): 
      outstanding.append(pool.apply_async(result.func, result.args)) 
     else: 
      ... do something with a "final" result, maybe breaking the loop ... 
finally: 
    pool.terminate() 

Was die Funktionen sind bis zu Ihnen, sie nur Informationen in einem MoreWork zurückkehren würde, wenn es mehr war, zu tun, eine Aufgabe nicht direkt starten. Es muss sichergestellt werden, dass das Deadlock für alle Mitarbeiter blockiert werden kann, die auf Tasks warten, die sich in der Warteschlange befinden, aber nicht verarbeitet werden, da der Elternteil allein für die Aufgabe verantwortlich ist .

Dies ist auch nicht optimiert; Im Idealfall würden Sie das Warten auf das erste Element in der Warteschlange nicht blockieren, wenn andere Elemente in der Warteschlange vollständig waren. Es ist viel einfacher, dies mit dem concurrent.futures Modul zu tun, speziell mit concurrent.futures.wait, um auf das erste verfügbare Ergebnis von einer beliebigen Anzahl von ausstehenden Aufgaben zu warten, aber Sie benötigen ein PyPI-Paket von Drittanbietern, um concurrent.futures auf Python 2.7 zu erhalten.

Verwandte Themen