2009-12-11 19 views

Antwort

59

Ich schlage vor, Sie Queue.Queue instanziiert, bevor der Faden beginnen, und es als eine der args Thread passieren: vor den Thread beendet, es .put s das Ergebnis in der Warteschlange als Argument empfangen. Die Eltern können .get oder .get_nowait es nach Belieben.

Queues sind in der Regel der beste Weg, die Thread-Synchronisation und Kommunikation in Python zu arrangieren: sie sind intrinsisch Thread-sicher, Message-Passing-Fahrzeuge - der beste Weg, im Allgemeinen zu organisieren Multitasking -)

+2

'vor den Thread beendet zu arbeiten, es .puts das Ergebnis auf der Warteschlange es als argument' erhalten Sie bedeutet dies automatisch von Python getan werden? Wenn nicht (als Designtipp gedacht), dann könntest du es in der Antwort klarstellen. – n611x007

+3

Es ist hässlich, eine bestehende Funktion dafür zu spezialisieren; und die Warteschlange hat viel unnötigen Overhead für ein einzelnes Ergebnisproblem. Eine klarere und effizientere Unterklasse 'threading.Thread' und die neue Methode run() speichern das Ergebnis einfach als Attribut wie' self.ret = ... '(Viel komfortabler wäre eine Unterklasse von Thread, die Rückgabewerte/Ausnahmen von behandelt die benutzerdefinierte target - Funktion, in der Tat sollte 'threading.Thread' erweitert werden, um das out of the box anzubieten - wie es mit dem alten Verhalten" return None "kompatibel wäre.) – kxr

1

Nun, im Python-Threading-Modul gibt es Zustandsobjekte, die mit Sperren verknüpft sind. Eine Methode gibt den Wert zurück, der von der zugrunde liegenden Methode zurückgegeben wird. Für weitere Informationen: Python Condition Objects

5

Ein weiterer Ansatz! ist eine Callback-Funktion an den Thread übergeben. Dies gibt eine einfache, sichere und flexible Möglichkeit, einen Wert jederzeit über den neuen Thread an das übergeordnete Element zurückzugeben.

# A sample implementation 

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, cb): 
     threading.Thread.__init__(self) 
     self.callback = cb 

    def run(self): 
     for i in range(10): 
      self.callback(i) 
      time.sleep(1) 


# test 

import sys 

def count(x): 
    print x 
    sys.stdout.flush() 

t = MyThread(count) 
t.start() 
+8

Das Problem dabei ist, dass der Callback noch in der läuft Kind-Thread, anstatt im ursprünglichen Thread. – babbageclunk

+0

@wilberforce könnten Sie bitte erklären, welche Probleme es verursachen kann? –

+4

Ok. Ein Beispiel wäre, wenn der Callback in eine Protokolldatei schreibt, in die der übergeordnete Thread schreibt, während der Thread ausgeführt wird. Da der Callback im untergeordneten Thread ausgeführt wird, besteht das Risiko, dass die beiden Schreibvorgänge gleichzeitig stattfinden und zusammenstoßen - Sie könnten eine verzerrte oder verschachtelte Ausgabe erhalten oder einen Absturz, wenn das Protokollierungsframework eine interne Buchführung ausführt. Durch die Verwendung einer thread-sicheren Warteschlange und eines Threads würde das Schreiben dies alles verhindern. Diese Art von Problemen kann unangenehm sein, weil sie nicht deterministisch sind - sie werden möglicherweise nur in der Produktion angezeigt und können schwierig zu reproduzieren sein. – babbageclunk

12

Wenn Sie beitreten wurden() aufgerufen wird für den Thread zu warten, beenden können Sie einfach das Ergebnis an die sich Thread-Instanz anhängen und dann, nachdem die join() kehrt aus dem Haupt-Thread abzurufen.

Auf der anderen Seite sagen Sie uns nicht, wie Sie herausfinden wollen, dass der Thread fertig ist und dass das Ergebnis verfügbar ist. Wenn Sie bereits eine Möglichkeit haben, dies zu tun, wird es Sie wahrscheinlich (und uns, wenn Sie uns sagen würden) auf den besten Weg hinweisen, die Ergebnisse zu erzielen.

+0

* Sie können das Ergebnis einfach an die Thread-Instanz selbst anhängen * Wie übergeben Sie die Thread-Instanz an das Ziel, das sie ausführt, damit das Ziel das Ergebnis an diese Instanz anhängen kann? –

+1

Piotr Dobrogost, wenn Sie kein Thread für Ihre Instanz erstellen, können Sie einfach threading.current_thread() vom Ende Ihrer Zielaufruffunktion verwenden. Ich würde das ein bisschen hässlich nennen, aber Alex Ansatz war immer der elegantere. Dieser ist in einigen Fällen nur zweckmäßiger. –

+3

Es wäre nett, wenn 'join()' nur das zurückgeben würde, was die aufgerufene Methode zurückgibt ... scheint albern, dass stattdessen 'None' zurückgegeben wird. – ArtOfWarfare

10

Sie sollten eine Warteschlangeninstanz als Parameter übergeben, dann sollten Sie Ihr Rückgabeobjekt in die Warteschlange stellen. Sie können den Rückgabewert über queue.get() sammeln, welches Objekt Sie auch setzen.

Probe:

queue = Queue.Queue() 
thread_ = threading.Thread(
       target=target_method, 
       name="Thread1", 
       args=[params, queue], 
       ) 
thread_.start() 
thread_.join() 
queue.get() 

def target_method(self, params, queue): 
""" 
Some operations right here 
""" 
your_return = "Whatever your object is" 
queue.put(your_return) 

Verwenden mehrerer Threads:

#Start all threads in thread pool 
    for thread in pool: 
     thread.start() 
     response = queue.get() 
     thread_results.append(response) 

#Kill all threads 
    for thread in pool: 
     thread.join() 

ich diese Implementierung verwenden und es funktioniert gut für mich. Ich wünschte, du würdest es tun.

+1

Fehlt Ihnen nicht Ihre thread_.start() ?? – sadmicrowave

+1

Natürlich starte ich den Thread, den ich gerade vermisse, um die Zeile hier zu setzen :) Danke für die Ankündigung. –

+0

Wie würde das aussehen, wenn Sie mehrere Threads hätten? que.get() gibt das Ergebnis eines Threads nur für mich zurück? – ABros

6

Verwendung lambda Ziel Thread-Funktion zu wickeln und den Rückgabewert das Muttergewinde mit einer Warteschlange zurück passieren. (Ihre ursprüngliche Zielfunktion bleibt unverändert ohne zusätzliche Queue-Parameter.)

Beispielcode:

import threading 
import queue 
def dosomething(param): 
    return param * 2 
que = queue.Queue() 
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) 
thr.start() 
thr.join() 
while not que.empty(): 
    print(que.get()) 

Ausgang:

4 
2

POC:

import random 
import threading 

class myThread(threading.Thread): 
    def __init__(self, arr): 
     threading.Thread.__init__(self) 
     self.arr = arr 
     self.ret = None 

    def run(self): 
     self.myJob(self.arr) 

    def join(self): 
     threading.Thread.join(self) 
     return self.ret 

    def myJob(self, arr): 
     self.ret = sorted(self.arr) 
     return 

#Call the main method if run from the command line. 
if __name__ == '__main__': 
    N = 100 

    arr = [ random.randint(0, 100) for x in range(N) ] 
    th = myThread(arr) 
    th.start() 
    sortedArr = th.join() 

    print "arr2: ", sortedArr 
3

können Sie queue synchronisiert verwenden Modul.
Betrachten Sie benötigen einen Benutzerinformationen aus der Datenbank mit einer bekannten ID zu überprüfen:

def check_infos(user_id, queue): 
    result = send_data(user_id) 
    queue.put(result) 

Jetzt können Sie Ihre Daten wie folgt erhalten:

import queue, threading 
queued_request = queue.Queue() 
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) 
check_infos_thread.start() 
final_result = queued_request.get() 
6

ich überrascht bin niemand erwähnt, dass Sie konnte einfach Pass es ein veränderbar:

>>> thread_return={'success': False} 
>>> from threading import Thread 
>>> def task(thread_return): 
... thread_return['success'] = True 
... 
>>> Thread(target=task, args=(thread_return,)).start() 
>>> thread_return 
{'success': True} 

vielleicht hat dies große Probleme, von denen ich nicht weiß.

+1

Das funktioniert perfekt! Ich würde gerne eine Meinung über fehlende Dinge mit diesem Ansatz hören, wenn überhaupt. –

+0

funktioniert. Es ist einfach hässlich, eine bestehende Funktion zu spezialisieren - und diese vielen verwirrenden Dinge (Lesbarkeit) - siehe Kommentar zur ersten Antwort. – kxr

+0

Wie wäre es mit dem Multithread? – backslash112

0

Die folgende Wrapperfunktion wird eine vorhandene Funktions wickeln und ein Objekt zurück, die sowohl auf den Thread-Punkte (so dass man start() nennen kann, join() usw. darauf) sowie den Zugang/seinen eventuellen Rückgabewert anzuzeigen.

def threadwrap(func,args,kwargs): 
    class res(object): result=None 
    def inner(*args,**kwargs): 
    res.result=func(*args,**kwargs) 
    import threading 
    t = threading.Thread(target=inner,args=args,kwargs=kwargs) 
    res.thread=t 
    return res 

def myFun(v,debug=False): 
    import time 
    if debug: print "Debug mode ON" 
    time.sleep(5) 
    return v*2 

x=threadwrap(myFun,[11],{"debug":True}) 
x.thread.start() 
x.thread.join() 
print x.result 

Es sieht nicht gut aus, und die threading.Thread Klasse scheint mit dieser Art von Funktionalität leicht erweitert (*) werden, so frage ich mich, warum es nicht schon da ist. Gibt es einen Fehler bei der obigen Methode?

(*) Beachten Sie, dass die Antwort von Husanu für diese Frage genau dies ist, Unterklasse threading.Thread, was zu einer Version führt, in der join() den Rückgabewert angibt.

1

Basierend auf dem Vorschlag von jcomeau_ictx. Der einfachste, dem ich begegnet bin. Voraussetzung hierfür war, dass der Status "exit status" von drei verschiedenen Prozessen auf dem Server abgerufen wurde und ein anderes Skript ausgelöst wurde, wenn alle drei erfolgreich waren. Dies scheint in Ordnung

class myThread(threading.Thread): 
     def __init__(self,threadID,pipePath,resDict): 
      threading.Thread.__init__(self) 
      self.threadID=threadID 
      self.pipePath=pipePath 
      self.resDict=resDict 

     def run(self): 
      print "Starting thread %s " % (self.threadID) 
      if not os.path.exists(self.pipePath): 
      os.mkfifo(self.pipePath) 
      pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK) 
      with os.fdopen(pipe_fd) as pipe: 
       while True: 
        try: 
        message = pipe.read() 
        if message: 
         print "Received: '%s'" % message 
         self.resDict['success']=message 
         break 
        except: 
         pass 

    tResSer={'success':'0'} 
    tResWeb={'success':'0'} 
    tResUisvc={'success':'0'} 


    threads = [] 

    pipePathSer='/tmp/path1' 
    pipePathWeb='/tmp/path2' 
    pipePathUisvc='/tmp/path3' 

    th1=myThread(1,pipePathSer,tResSer) 
    th2=myThread(2,pipePathWeb,tResWeb) 
    th3=myThread(3,pipePathUisvc,tResUisvc) 

    th1.start() 
    th2.start() 
    th3.start() 

    threads.append(th1) 
    threads.append(th2) 
    threads.append(th3) 

    for t in threads: 
     print t.join() 

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) 
    # The above statement prints updated values which can then be further processed