2009-09-24 12 views
7

Also, ich habe eine Anwendung, die Twisted + Stomper als STOMP-Client verwendet, die Arbeit in einem Multiprozessing.Pool von Arbeitern farmt.Twisted-Network-Client mit Multiprocessing-Mitarbeitern?

Dies scheint ok zu arbeiten, wenn ich dies nur ein Python-Skript verwenden, um Feuer, die (vereinfacht) sieht wie folgt aus etwas:

# stompclient.py 

logging.config.fileConfig(config_path) 
logger = logging.getLogger(__name__) 

# Add observer to make Twisted log via python 
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool. (child processes get forked off immediately) 
pool = multiprocessing.Pool(processes=processes) 

StompClientFactory.username = username 
StompClientFactory.password = password 
StompClientFactory.destination = destination 
reactor.connectTCP(host, port, StompClientFactory()) 
reactor.run() 

Da dies für die Bereitstellung verpackt wird, ich dachte, dass ich den Vorteil nehmen würde des Twisted-Skripts und führen Sie dies aus einer Tac-Datei.

meine hier mit sehr ähnlich aussehenden tac-Datei:

# stompclient.tac 

logging.config.fileConfig(config_path) 
logger = logging.getLogger(__name__) 

# Add observer to make Twisted log via python 
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool. (child processes get forked off immediately) 
pool = multiprocessing.Pool(processes=processes) 

StompClientFactory.username = username 
StompClientFactory.password = password 
StompClientFactory.destination = destination 

application = service.Application('myapp') 

service = internet.TCPClient(host, port, StompClientFactory()) 
service.setServiceParent(application) 

Aus Gründen der Illustration, ich habe ein paar Details zusammengebrochen oder geändert; Hoffentlich waren sie nicht die Essenz des Problems. Zum Beispiel hat meine App ein Plugin-System, der Pool wird durch eine separate Methode initialisiert, und dann wird die Arbeit an den Pool delegiert, indem pool.apply_async() verwendet wird, indem ich eine der process() Methoden meines Plugins übergebe.

Also, wenn ich das Skript (stompclient.py) ausführen, funktioniert alles wie erwartet.

Es scheint auch OK zu arbeiten, wenn ich Twist in Nicht-Daemon-Modus (-n) ausgeführt:

twistd -noy stompclient.tac 

jedoch tut es nicht Arbeit, wenn ich im Daemon-Modus ausgeführt:

twistd -oy stompclient.tac 

Die Anwendung scheint zu starten OK, aber wenn es versucht, die Arbeit abzuzweigen, hängt es einfach. Mit "hängen" meine ich, dass es scheint, dass der Child-Prozess nie aufgefordert wird, etwas zu tun, und dass das Elternelement (das "pool.apply_async()" genannt wird) nur darauf wartet, dass die Antwort zurückkommt.

Ich bin mir sicher, dass ich mit Twisted + Multiprocessing etwas Dummes mache, aber ich hoffe wirklich, dass mir jemand den Fehler in meinem Ansatz erklären kann.

Vielen Dank im Voraus!

Antwort

12

Da der Unterschied zwischen Ihrem Arbeitsaufruf und Ihrem nichtaufrufenden Aufruf nur die Option "-n" ist, scheint es am wahrscheinlichsten, dass das Problem durch den Daemonisierungsprozess verursacht wird (was "-n" verhindert).

In POSIX ist einer der Schritte, die bei der Daemonisierung beteiligt sind, das Forking und der Eltern-Exit. Dies hat unter anderem zur Folge, dass Ihr Code in einem anderen Prozess als dem ausgeführt wird, in dem die .tac-Datei ausgewertet wurde. Dadurch wird auch die Kind/Eltern-Beziehung von Prozessen neu geordnet, die in der .tac-Datei gestartet wurden - wie Ihr Pool von Multiprocessing-Prozessen.

Die Prozesse des Multiprocessing-Pools beginnen mit einem Elternteil des Twistd-Prozesses, den Sie starten. Wenn dieser Prozess jedoch als Teil der Daemonisierung beendet wird, wird der übergeordnete Prozess zum Systeminitialisierungsprozess. Dies kann einige Probleme verursachen, obwohl es wahrscheinlich nicht das hängende Problem ist, das Sie beschrieben haben.Es gibt wahrscheinlich andere ähnliche Implementierungsdetails auf niedriger Ebene, die normalerweise erlauben, dass das Multiprozessormodul funktioniert, die jedoch durch den Daemonisierungsprozess gestört werden.

Glücklicherweise sollte diese seltsame Wechselwirkung zu vermeiden unkompliziert sein. Die Dienst-APIs von Twisted ermöglichen die Ausführung von Code nach Abschluss der Daemonisierung. Wenn Sie diese APIs verwenden, können Sie die Initialisierung des Prozesspools des Multiprozessormoduls bis nach der Daemonisierung verzögern und das Problem hoffentlich vermeiden. Hier ist ein Beispiel dafür, was das aussehen könnte:

from twisted.application.service import Service 

class MultiprocessingService(Service): 
    def startService(self): 
     self.pool = multiprocessing.Pool(processes=processes) 

MultiprocessingService().setServiceParent(application) 

nun getrennt, können Sie auch auf Probleme stoßen im Zusammenhang aufzuräumen des Kindes Prozesse des Multiprocessing-Modul oder möglicherweise Probleme mit Prozessen erstellt mit Verdrehte der Prozesserstellung API, reactor.spawnProzess. Dies liegt daran, dass ein Teil des Umgangs mit Kindprozessen im Allgemeinen den Umgang mit dem SIGCHLD-Signal beinhaltet. Twisted und Multiprocessing werden in dieser Hinsicht jedoch nicht kooperieren, so dass einer von ihnen über alle aussteigenden Kinder informiert wird und der andere nie benachrichtigt wird. Wenn Sie die Twisted-API nicht zum Erstellen von untergeordneten Prozessen verwenden, ist dies für Sie in Ordnung - aber Sie sollten überprüfen, ob ein Signal-Handler, das das Multiprocessing-Modul zu installieren versucht, tatsächlich "gewinnt" und nicht erhält ersetzt durch Twisteds eigenen Handler.

+2

Das war * extrem * hilfreich. Vielen Dank! –

0

Eine mögliche Idee für Sie ...

Im Daemon-Modus ausgeführt wird Twistd wird stdin, stdout und stderr schließen. Ist etwas, was Ihre Kunden lesen oder schreiben, etwas?

+0

Nichts sollte denjenigen schreiben (und meine Protokollierung geht an syslog ist alles), aber ich frage mich, ob vielleicht einige Low-Level-Fehler auf stderr zu gehen versucht. Es kann frustrierend sein, in Stille zu debuggen :) –