2009-05-27 8 views
11

Ich schreibe ein Server-Programm mit einem Hersteller und mehreren Consumern, was mich verwirrt ist nur der erste Task-Produzent in die Warteschlange gestellt bekommt verbraucht, nach denen Aufgaben eingereiht nicht mehr konsumiert werden, Sie bleiben für immer in der Warteschlange.Producer/Consumer Problem mit Python Multiprocessing

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

Der Hersteller ist ein HTTP-Server, der eine Aufgabe in der Warteschlange einmal eine Anforderung von dem Benutzer erhalten setzen. Es scheint, dass Consumer-Prozesse sind immer noch blockiert , wenn neue Aufgaben in der Warteschlange sind, was seltsam ist.

P.S. Noch zwei Fragen, die nicht in Bezug auf die oben genannten, ich bin nicht sicher, ob ist es besser, HTTP-Server in einen eigenen Prozess als die Haupt Prozess, wenn ja wie kann ich den Hauptprozess laufen lassen, bevor alle Kinder Prozesse enden. Zweite Frage, was ist der beste Weg, um den HTTP-Server anmutig zu stoppen?

bearbeiten: Erzeugercode hinzufügen, es ist nur eine einfache Python wsgi Server:

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

Antwort

7

ich da denken, dass etwas falsch mit dem Web-Server-Teil sein muss, da diese perfekt funktioniert:

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 

Beispielausgabe:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

Ehrfürchtig Während Wenn Sie einen Hersteller + Multi-Arbeiter Beispiel bieten könnte. Es wäre nett. –

4

"Zweite Frage, was ist der beste Weg, den HTTP-Server anmutig zu stoppen?"

Das ist schwer.

Sie haben zwei Möglichkeiten für Interprocess Communication:

  • Out-of-Band-Kontrollen. Der Server hat einen anderen Mechanismus für die Kommunikation. Ein anderer Sockel, ein Unix-Signal oder etwas anderes. Das etwas andere könnte eine "stop-now" Datei im lokalen Verzeichnis des Servers sein. Scheint seltsam, aber es funktioniert gut und ist einfacher als eine Auswahlschleife einzuführen, um auf mehrere Sockets oder einen Signalhandler zu hören, um ein Unis-Signal zu erfassen.

    Die Datei "stop-now" ist einfach zu implementieren. Die Schleife evwsgi.run() prüft lediglich nach jeder Anfrage auf diese Datei. Um den Server zu stoppen, erstellen Sie die Datei, führen Sie eine /control Anfrage (die einen Fehler 500 oder etwas bekommt, ist es nicht wirklich wichtig) und der Server sollte zum Stillstand kommen. Denken Sie daran, die stop-now-Datei zu löschen, da Ihr Server andernfalls nicht neu gestartet wird.

  • In-Band-Kontrollen. Der Server hat eine andere URL (/stop), die ihn stoppt. Oberflächlich gesehen scheint dies ein Sicherheitsalarm, aber es hängt ganz davon ab, wo und wie dieser Server verwendet wird. Da es sich um einen einfachen Wrapper um eine interne Anforderungswarteschlange handelt, funktioniert diese zusätzliche URL gut.

    Damit dies funktioniert, müssen Sie Ihre eigene Version von evwsgi.run() schreiben, die beendet werden kann, indem Sie eine Variable so setzen, dass sie aus der Schleife ausbricht.

bearbeiten

Sie wollen wahrscheinlich nicht Ihren Server beenden, da Sie den Zustand nicht wissen davon Worker-Threads ist. Sie müssen den Server signalisieren, und dann müssen Sie nur warten, bis es die Dinge normal beendet.

Wenn Sie den Server gewaltsam beenden möchten, funktioniert os.kill() (oder multiprocessing.terminate). Außer natürlich, du weißt nicht, was die Kind-Threads machten.

+0

Wie wäre es, den Server in einen eigenen Prozess zu versetzen und die multiprocessing.Process.terminate-Methode zu verwenden, um den Prozess zu beenden? Das scheint einfacher zu sein. – btw0