2013-05-31 7 views
9

Ich bin auf der Suche nach einer Python-Klasse (vorzugsweise Teil der Standardsprache, anstatt einer 3rd-Party-Bibliothek) zu asynchronen Broadcast-Stil Messaging zu verwalten.Need eine thread-sichere asynchrone Nachrichtenwarteschlange

Ich habe einen Thread, der Nachrichten in die Warteschlange setzt (die 'putMessageOnQueue' -Methode darf nicht blockieren) und dann mehrere andere Threads, die alle auf Nachrichten warten, vermutlich eine blockierende "waitForMessage" -Funktion aufgerufen haben. Wenn eine Nachricht in die Warteschlange gestellt wird, möchte ich, dass jeder der wartenden Threads eine eigene Kopie der Nachricht erhält.

Ich habe die integrierte Queue-Klasse angeschaut, aber ich denke nicht, dass dies geeignet ist, weil das konsumieren von Nachrichten das Entfernen aus der Warteschlange beinhaltet, so dass nur 1 Client-Thread jeden sehen würde.

Dies scheint, wie es ein gemeinsamer Anwendungsfall sein soll, kann jemand eine Lösung empfehlen?

+0

I glaube, du kannst deine eigene Klasse aufbauen, die verfolgt, welcher Thread welche Nachricht erhalten hat, ohne viele Probleme. – Bakuriu

Antwort

7

Ich denke, der typische Ansatz ist, für jeden Thread eine separate Nachrichtenwarteschlange zu verwenden und die Nachricht in jede Warteschlange zu schieben, die zuvor ein Interesse am Empfangen solcher Nachrichten registriert hat.

So etwas sollte, arbeiten, aber es ist nicht getesteten Code ...

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

Perfekt, das funktioniert super! Schade, dass es keine fertige Version gibt, aber ich denke, das Prinzip ist nicht so kompliziert, wenn es jemand einmal klar erklärt (wie Sie es getan haben). – codebox

+0

@codebox Nun, es gibt eine bessere Unterstützung im Modul "Multiprocessing" (http://docs.python.org/2/library/multiprocessing.html), aber das ist eher für Subprozesse als für Threads. Ich denke, das liegt daran, dass Interprozess-Kommunikation normalerweise komplexer ist als Inter-Thread-Kommunikation, da Threads den gleichen Heap teilen. – Aya

2

Ich denke, das ein geradlinig Beispiel (aus der Warteschlange genommen Beispiel in Python Lib)

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

Wie erfüllt das die Anforderungen der Frage? Er sagte explizit, dass die Warteschlange nicht funktioniert, da jeder Thread eine Kopie des Elements benötigt. – Wlerin