2016-06-30 5 views
1

Ich versuche, eine asyncio Worker-Klasse zu machen, die Aufträge aus einer Job-Warteschlange konsumieren und bis zu N Jobs parallel verarbeiten wird. Einige Jobs können zusätzliche Jobs in die Warteschlange stellen. Wenn die Jobwarteschlange leer ist und der Worker alle aktuellen Jobs beendet, sollte er beendet werden.Asyncio Worker, der N Jobs gleichzeitig bearbeitet?

Ich kämpfe immer noch mit asyncio konzeptionell. Hier ist einer meiner Versuche, wo N=3:

import asyncio, logging, random 

async def do_work(id_): 
    await asyncio.sleep(random.random()) 
    return id_ 

class JobQueue: 
    ''' Maintains a list of all pendings jobs. ''' 
    def __init__(self): 
     self._queue = asyncio.Queue() 
     self._max_id = 10 
     for id_ in range(self._max_id): 
      self._queue.put_nowait(id_ + 1) 

    def add_job(self): 
     self._max_id += 1 
     self._queue.put_nowait(self._max_id) 

    async def get_job(self): 
     return await self._queue.get() 

    def has_jobs(self): 
     return self._queue.qsize() > 0 

class JobWorker: 
    ''' Processes up to 3 jobs at a time in parallel. ''' 
    def __init__(self, job_queue): 
     self._current_jobs = set() 
     self._job_queue = job_queue 
     self._semaphore = asyncio.Semaphore(3) 

    async def run(self): 
     while self._job_queue.has_jobs() or len(self._current_jobs) > 0: 
      print('Acquiring semaphore...') 
      await self._semaphore.acquire() 
      print('Getting a job...') 
      job_id = await self._job_queue.get_job() 
      print('Scheduling job {}'.format(job_id)) 
      self._current_jobs.add(job_id) 
      task = asyncio.Task(do_work(job_id)) 
      task.add_done_callback(self.task_finished) 

    def task_finished(self, task): 
     job_id = task.result() 
     print('Finished job {}/released semaphore'.format(job_id)) 
     self._current_jobs.remove(job_id) 
     self._semaphore.release() 
     if random.random() < 0.2: 
      print('Queuing a new job') 
      self._job_queue.add_job() 

loop = asyncio.get_event_loop() 
jw = JobWorker(JobQueue()) 
print('Starting event loop') 
loop.run_until_complete(jw.run()) 
print('Event loop ended') 
loop.close() 

Ein Auszug aus der Ausgabe:

Starting event loop 
Acquiring semaphore... 
Getting a job... 
Scheduling job 1 
Acquiring semaphore... 
Getting a job... 
Scheduling job 2 
Acquiring semaphore... 
Getting a job... 
Scheduling job 3 
Acquiring semaphore... 
Finished job 2/released semaphore 
Getting a job... 
Scheduling job 4 
...snip... 
Acquiring semaphore... 
Finished job 11/released semaphore 
Getting a job... 
Finished job 12/released semaphore 
Finished job 13/released semaphore 

Es scheint korrekt alle Aufträge zu verarbeiten, während der Verarbeitung nicht mehr als 3 Jobs zu jeder Zeit. Das Programm hängt jedoch, nachdem der letzte Job beendet ist. Wie durch die Ausgabe angezeigt, scheint es bei job_id = await self._job_queue.get_job() zu hängen. Sobald die Jobwarteschlange leer ist, wird diese Coroutine nie fortgesetzt, und die Überprüfung, ob die Jobwarteschlange leer ist (am Anfang der Schleife), wird nicht erneut erreicht.

Ich habe versucht, dies auf eine Reihe von Möglichkeiten zu umgehen, aber konzeptionell etwas nicht genau passen. Mein aktuelles WIP gibt einige Futures zwischen der Warteschlange und dem Arbeiter weiter und verwendet dann eine Kombination von asyncio.wait(...) auf allen von ihnen, aber es wird hässlich und ich frage mich, ob es eine elegante Lösung gibt, die ich übersehe.

Antwort

2

Sie könnten Vorteil queue.task_done nehmen, die angibt, dass eine früher die Warteschlange eingereiht Aufgabe abgeschlossen ist. Dann können Sie queue.join und queue.get mit asyncio.wait kombinieren: wenn queue.join beendet und queue.get nicht, bedeutet dies, dass alle Aufträge abgeschlossen wurden.

Sehen Sie folgendes Beispiel:

class Worker: 

    def __init__(self, func, n=3): 
     self.func = func 
     self.queue = asyncio.Queue() 
     self.semaphore = asyncio.Semaphore(n) 

    def put(self, *args): 
     self.queue.put_nowait(args) 

    async def run(self): 
     while True: 
      args = await self._get() 
      if args is None: 
       return 
      asyncio.ensure_future(self._target(args)) 

    async def _get(self): 
     get_task = asyncio.ensure_future(self.queue.get()) 
     join_task = asyncio.ensure_future(self.queue.join()) 
     await asyncio.wait(coros, return_when='FIRST_COMPLETED') 
     if get_task.done(): 
      return task.result() 

    async def _target(self, args): 
     try: 
      async with self.semaphore: 
       return await self.func(*args) 
     finally: 
      self.queue.task_done() 
+0

Ich mag diese Idee, aber die 'asyncio.wait()' ist ein wenig peinlich, da sie Ergebnisse in einer unvorhersehbaren Reihenfolge zurückgibt. In diesem Spielzeugbeispiel sind die möglichen Ergebnisse nur "None" oder "int", so dass es leicht ist herauszufinden, welche Coroutine tatsächlich beendet ist, aber ich denke in der Praxis wird es sehr mühsam sein herauszufinden, welche Coroutine zuerst fertig war. 'asyncio.gather()' macht etwas mehr Sinn, aber es fehlt der 'return_when' Parameter. Hmm ... –

+0

@mehaase Siehe mein Edit (beachte, dass 'args' eine Liste von Argumenten ist, die an' self.func' oder 'None' übergeben werden, wenn alle Jobs abgeschlossen sind). – Vincent

+0

Ahhhh, es scheint so einfach, nachdem ich die Lösung gesehen habe! Vielen Dank. –

2

Sie können get_job mit einfach asyncio.wait_for Timeout. Zum Beispiel mit 1s und zurück zum Anfang der Schleife bei Timeout.

async def run(self): 
     while self._job_queue.has_jobs() or len(self._current_jobs) > 0: 
      print('Acquiring semaphore...') 
      await self._semaphore.acquire() 
      print('Getting a job...') 
      try: 
       job_id = await asyncio.wait_for(self._job_queue.get_job(), 1) 
      except asyncio.TimeoutError: 
       continue 
      print('Scheduling job {}'.format(job_id)) 
      self._current_jobs.add(job_id) 
      task = asyncio.Task(do_work(job_id)) 
      task.add_done_callback(self.task_finished) 
+0

+1 Dies ist definitiv die einfachste und sauberste fix, aber es fühlt sich ein bisschen wie Betrug. Anstatt darauf zu warten, dass die Ereignisschleife mir sagt, wann ich fortfahren soll, frage ich die Jobwarteschlange ab. (Nun, eine Mischung aus Polling und Koroutinen.) –

Verwandte Themen