Ich versuche, eine asyncio
Worker-Klasse zu machen, die Aufträge aus einer Job-Warteschlange konsumieren und bis zu N
Jobs parallel verarbeiten wird. Einige Jobs können zusätzliche Jobs in die Warteschlange stellen. Wenn die Jobwarteschlange leer ist und der Worker alle aktuellen Jobs beendet, sollte er beendet werden.Asyncio Worker, der N Jobs gleichzeitig bearbeitet?
Ich kämpfe immer noch mit asyncio
konzeptionell. Hier ist einer meiner Versuche, wo N=3
:
import asyncio, logging, random
async def do_work(id_):
await asyncio.sleep(random.random())
return id_
class JobQueue:
''' Maintains a list of all pendings jobs. '''
def __init__(self):
self._queue = asyncio.Queue()
self._max_id = 10
for id_ in range(self._max_id):
self._queue.put_nowait(id_ + 1)
def add_job(self):
self._max_id += 1
self._queue.put_nowait(self._max_id)
async def get_job(self):
return await self._queue.get()
def has_jobs(self):
return self._queue.qsize() > 0
class JobWorker:
''' Processes up to 3 jobs at a time in parallel. '''
def __init__(self, job_queue):
self._current_jobs = set()
self._job_queue = job_queue
self._semaphore = asyncio.Semaphore(3)
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
job_id = await self._job_queue.get_job()
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
def task_finished(self, task):
job_id = task.result()
print('Finished job {}/released semaphore'.format(job_id))
self._current_jobs.remove(job_id)
self._semaphore.release()
if random.random() < 0.2:
print('Queuing a new job')
self._job_queue.add_job()
loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()
Ein Auszug aus der Ausgabe:
Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2/released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11/released semaphore
Getting a job...
Finished job 12/released semaphore
Finished job 13/released semaphore
Es scheint korrekt alle Aufträge zu verarbeiten, während der Verarbeitung nicht mehr als 3 Jobs zu jeder Zeit. Das Programm hängt jedoch, nachdem der letzte Job beendet ist. Wie durch die Ausgabe angezeigt, scheint es bei job_id = await self._job_queue.get_job()
zu hängen. Sobald die Jobwarteschlange leer ist, wird diese Coroutine nie fortgesetzt, und die Überprüfung, ob die Jobwarteschlange leer ist (am Anfang der Schleife), wird nicht erneut erreicht.
Ich habe versucht, dies auf eine Reihe von Möglichkeiten zu umgehen, aber konzeptionell etwas nicht genau passen. Mein aktuelles WIP gibt einige Futures zwischen der Warteschlange und dem Arbeiter weiter und verwendet dann eine Kombination von asyncio.wait(...)
auf allen von ihnen, aber es wird hässlich und ich frage mich, ob es eine elegante Lösung gibt, die ich übersehe.
Ich mag diese Idee, aber die 'asyncio.wait()' ist ein wenig peinlich, da sie Ergebnisse in einer unvorhersehbaren Reihenfolge zurückgibt. In diesem Spielzeugbeispiel sind die möglichen Ergebnisse nur "None" oder "int", so dass es leicht ist herauszufinden, welche Coroutine tatsächlich beendet ist, aber ich denke in der Praxis wird es sehr mühsam sein herauszufinden, welche Coroutine zuerst fertig war. 'asyncio.gather()' macht etwas mehr Sinn, aber es fehlt der 'return_when' Parameter. Hmm ... –
@mehaase Siehe mein Edit (beachte, dass 'args' eine Liste von Argumenten ist, die an' self.func' oder 'None' übergeben werden, wenn alle Jobs abgeschlossen sind). – Vincent
Ahhhh, es scheint so einfach, nachdem ich die Lösung gesehen habe! Vielen Dank. –