Ich habe eine pool_map
Funktion, die verwendet werden kann, um die Anzahl der gleichzeitig ausgeführten Funktionen zu begrenzen.Wie kann ein asyncio-Pool abgebrochen werden?
Die Idee eine coroutine function Annahme eines einzigen Parameter zu haben, die auf eine Liste der möglichen Parameter abgebildet wird, sondern auch alle Funktion wickeln Anrufe in eine Semaphore Erwerb, wobei nur eine begrenzte Anzahl auf einmal ausgeführt wird:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
Ich modifizierte und testete nicht oben Code für ein Beispiel, aber meine Variante funktioniert gut. Z.B. Sie können es wie folgt verwenden:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
Aber ein Problem entsteht, wenn es eine Ausnahme in Erwartung einer Zukunft geworfen ist. Ich kann nicht sehen, wie man alle geplanten oder noch laufenden Aufgaben abbricht, noch die, die immer noch darauf warten, dass der Semaphor erworben wird.
Gibt es dafür eine Bibliothek oder einen eleganten Baustein, oder muss ich alle Teile selber bauen? (Dh ein Semaphore
mit Zugang zu seinen Kellnern, ein as_finished
, den Zugriff auf seine laufenden Task-Warteschlange stellt, ...)