2017-12-17 7 views
0

Ich habe einen gevent.pool (mit fester Größe) zwischen mehreren Task-Produzenten geteilt. Jeder Task-Produzent kann ein neues Greenlet auf den Pool anwenden, wenn freie Slots vorhanden sind. Nachdem Aufgaben zum Pool hinzugefügt wurden, sollte der Task-Produzent warten, bis alle hinzugefügten Aufgaben beendet sind.Gevent: wie man auf den Satz von Greenlets wartet

Ich habe versucht, gevent.queue.JoinableQueue zu verwenden, um zu warten, bis alle Aufgaben erledigt sind. Es funktioniert, außer dass ich am Ende des Wartens eine sehr ärgerliche Ausnahme bekomme.

Wie kann ich den folgenden Code beheben, um das zu vermeiden? Vielleicht mache ich etwas falsch?

from gevent import monkey, sleep; monkey.patch_all() 
from gevent.queue import JoinableQueue 
from gevent.pool import Pool 

pool = Pool(3) 


def worker(n): 
    print 'Worker {} started'.format(n) 
    sleep(1) 
    print 'Worker {} finished'.format(n) 
    return n 


def main(): 
    results = [] 

    queue = JoinableQueue() 
    for job_no in range(5): 
     pool.wait_available() 
     greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret)) 
     queue.put(greenlet) 
     sleep(.05) 
    print 'All workers added' 

    queue.join() 
    print 'All workers finished', results 


if __name__ == '__main__': 
    main() 

Ausgang:

Worker 0 started 
Worker 1 started 
Worker 2 started 
Worker 0 finished 
Worker 3 started 
Worker 1 finished 
Worker 4 started 
All workers added 
Worker 2 finished 
Worker 3 finished 
Worker 4 finished 
Traceback (most recent call last): 
    File "main.py", line 32, in <module> 
    main() 
    File "main.py", line 27, in main 
    queue.join() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join 
    return self._cond.wait(timeout=timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait 
    return self._wait(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait 
    gotit = self._wait_core(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core 
    result = self.hub.switch() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch 
    return RawGreenlet.switch(self) 
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>) 

Antwort

1

Sie erhalten ‚Diese Operation würde blockieren immer‘ Fehler, da kein greenlet ist Aufgaben in der Warteschlange zu verbrauchen, queue.join() nur blockiert, bis alle greenlets fertig ist, dann ist die Ausnahme angehoben.

JoinableQueue ist hier nicht nötig, verwenden Sie gevent.joinall() für alle greenlets warten zu beenden:

import gevent 

def main(): 
    results = [] 
    gs = [] 
    for job_no in range(5): 
     greenlet = .. 
     gs.append(greenlet) 
    gevent.joinall(gs) 
    print 'All workers finished', results 
Verwandte Themen