2014-09-18 13 views
9

Ich habe eine Klasse, die in separaten Threads in meiner Anwendung ausgeführt wird. Ich kann mehrere Threads gleichzeitig ausführen und die Threads sind Daemons. Nach einer gewissen Zeit müssen einige dieser Threads empfangen und eine Nachricht verarbeiten. Wie mache ich das?Wie sende ich Daten an einen laufenden Python-Thread?

Eine Probe von meinem Code sieht wie folgt aus:

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, args=(), kwargs=None): 
     threading.Thread.__init__(self, args=(), kwargs=None) 
     self.daemon = True 
     self.receive_messages = args[0] 

    def run(self): 
     print threading.currentThread().getName(), self.receive_messages 

    def do_thing_with_message(self, message): 
     if self.receive_messages: 
      print threading.currentThread().getName(), "Received %s".format(message) 

if __name__ == '__main__': 
    threads = [] 
    for t in range(10): 
     threads.append(MyThread(args=(t % 2 == 0,))) 
     threads[t].start() 
     time.sleep(0.1) 

    for t in threads: 
     t.do_thing_with_message("Print this!") 

Diese Ausgänge:

Thread-1 True 
Thread-2 False 
Thread-3 True 
Thread-4 False 
Thread-5 True 
Thread-6 False 
Thread-7 True 
Thread-8 False 
Thread-9 True 
Thread-10 False 
MainThread Received %s 
MainThread Received %s 
MainThread Received %s 
MainThread Received %s 
MainThread Received %s 

Ich erwarte jedoch, diese letzten fünf Zeilen nicht auf die MainThread und stattdessen bezogen werden von %s, würde ich es mir Print this!, wie so erwarten:

Thread-1 True 
Thread-2 False 
Thread-3 True 
Thread-4 False 
Thread-5 True 
Thread-6 False 
Thread-7 True 
Thread-8 False 
Thread-9 True 
Thread-10 False 
Thread-1 Received Print this! 
Thread-3 Received Print this! 
Thread-5 Received Print this! 
Thread-7 Received Print this! 
Thread-9 Received Print this! 

Wie kann ich eine Nachricht wie diese korrekt an die laufenden Threads senden?

Nachtrag:

Wenn ich diesen Block nach dem Block Print this! haben, und nutzen @ Dano Code das Problem oben zu lösen, es scheint nicht auf diese neuen Nachrichten zu reagieren.

for t in threads: 
    t.queue.put("Print this again!") 
    time.sleep(0.1) 
In diesem Fall

, würde ich das Ende meiner Ausgabe erwarten wie diese können suchen

Thread-1 Received Print this! 
Thread-3 Received Print this! 
Thread-5 Received Print this! 
Thread-7 Received Print this! 
Thread-9 Received Print this! 
Thread-1 Received Print this again! 
Thread-3 Received Print this again! 
Thread-5 Received Print this again! 
Thread-7 Received Print this again! 
Thread-9 Received Print this again! 
+0

Dies ist, was [Task-Schlangen] (http: // www .celeryproject.org /) sind für. –

+1

@BurhanKhalid Ich würde sagen, dass die Verwendung von etwas wie Sellerie hier zuviel wäre. Sellerie ist definitiv nützlich, um Arbeit an mehrere Prozesse oder mehrere Maschinen in einem Cluster zu verteilen, aber etwas viel Einfacheres wird hier den Anforderungen des OP entsprechen. – dano

Antwort

9

Sie ein verwenden Queue.Queue (oder queue.Queue in Python 3) dafür:

import threading 
import time 
from Queue import Queue 

print_lock = threading.Lock() 

class MyThread(threading.Thread): 
    def __init__(self, queue, args=(), kwargs=None): 
     threading.Thread.__init__(self, args=(), kwargs=None) 
     self.queue = queue 
     self.daemon = True 
     self.receive_messages = args[0] 

    def run(self): 
     print threading.currentThread().getName(), self.receive_messages 
     val = self.queue.get() 
     self.do_thing_with_message(val) 

    def do_thing_with_message(self, message): 
     if self.receive_messages: 
      with print_lock: 
       print threading.currentThread().getName(), "Received {}".format(message) 

if __name__ == '__main__': 
    threads = [] 
    for t in range(10): 
     q = Queue() 
     threads.append(MyThread(q, args=(t % 2 == 0,))) 
     threads[t].start() 
     time.sleep(0.1) 

    for t in threads: 
     t.queue.put("Print this!") 

    for t in threads: 
     t.join() 

Wir übergeben eine Queue Instanz an jeden Thread, und senden Sie unsere Nachricht an die Thread mit queue.put. Wir warten auf die Nachricht, die in der run-Methode ankommt, die der Teil des Thread Objekts ist, das tatsächlich in einem separaten Ausführungsthread ausgeführt wird. Sobald wir die Nachricht erhalten, rufen wir do_thing_with_message, die im selben Hintergrund Thread laufen wird.

Ich habe auch einen threading.Lock zu dem Code hinzugefügt, damit die Drucke zu stdout nicht verwechselt werden.

Edit:

Wenn Sie mehrere Nachrichten an den Thread liefern zu können, benutzen Sie einfach eine Schleife:

def run(self): 
    print threading.currentThread().getName(), self.receive_messages 
    while True: 
     val = self.queue.get() 
     if val is None: # If you send `None`, the thread will exit. 
      return 
     self.do_thing_with_message(val) 
+0

Das ist großartig! Es löst das Problem, das ich gestellt habe, aber vielleicht nicht ganz so, wie ich es erwartet habe. Wenn ich eine weitere "Print this!" - Schleife hinzufüge (oder eine beliebige Anzahl von Schleifen, weil ich erwarte, dass Nachrichten an diese Threads gesendet werden können, solange sie existieren), wird nur die erste Nachricht an jeden Thread gedruckt. Wie kann man eine beliebige Anzahl von Nachrichten akzeptieren?(Wenn das zu viel ist, kann ich eine separate Frage stellen und diese als die Antwort meiner ursprünglichen Frage markieren) –

+0

Danke für die Informationen über die Sperre auch. –

+0

@PythonNoob Setzen Sie einfach eine 'while True:' Schleife um die 'val = self.queue.get(); self.do_thing_with_message (val) 'ruft auf. – dano

Verwandte Themen