2017-01-16 2 views
1

Ich habe eine pool_map Funktion, die verwendet werden kann, um die Anzahl der gleichzeitig ausgeführten Funktionen zu begrenzen.Wie kann ein asyncio-Pool abgebrochen werden?

Die Idee eine coroutine function Annahme eines einzigen Parameter zu haben, die auf eine Liste der möglichen Parameter abgebildet wird, sondern auch alle Funktion wickeln Anrufe in eine Semaphore Erwerb, wobei nur eine begrenzte Anzahl auf einmal ausgeführt wird:

from typing import Callable, Awaitable, Iterable, Iterator 
from asyncio import Semaphore 

A = TypeVar('A') 
V = TypeVar('V') 

async def pool_map(
    func: Callable[[A], Awaitable[V]], 
    arg_it: Iterable[A], 
    size: int=10 
) -> Generator[Awaitable[V], None, None]: 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    return map(sub, arg_it) 

Ich modifizierte und testete nicht oben Code für ein Beispiel, aber meine Variante funktioniert gut. Z.B. Sie können es wie folgt verwenden:

from asyncio import get_event_loop, coroutine, as_completed 
from contextlib import closing 

URLS = [...] 

async def run_all(awaitables): 
    for a in as_completed(awaitables): 
     result = await a 
     print('got result', result) 

async def download(url): ... 


if __name__ != '__main__': 
    pool = pool_map(download, URLS) 

    with closing(get_event_loop()) as loop: 
     loop.run_until_complete(run_all(pool)) 

Aber ein Problem entsteht, wenn es eine Ausnahme in Erwartung einer Zukunft geworfen ist. Ich kann nicht sehen, wie man alle geplanten oder noch laufenden Aufgaben abbricht, noch die, die immer noch darauf warten, dass der Semaphor erworben wird.

Gibt es dafür eine Bibliothek oder einen eleganten Baustein, oder muss ich alle Teile selber bauen? (Dh ein Semaphore mit Zugang zu seinen Kellnern, ein as_finished, den Zugriff auf seine laufenden Task-Warteschlange stellt, ...)

Antwort

1

Verwenden ensure_future eine Task anstelle eines Koroutine zu bekommen:

import asyncio 
from contextlib import closing 


def pool_map(func, args, size=10): 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = asyncio.Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    tasks = [asyncio.ensure_future(sub(x)) for x in args] 

    return tasks 


async def f(n): 
    print(">>> start", n) 

    if n == 7: 
     raise Exception("boom!") 

    await asyncio.sleep(n/10) 

    print("<<< end", n) 
    return n 


async def run_all(tasks): 
    exc = None 
    for a in asyncio.as_completed(tasks): 
     try: 
      result = await a 
      print('=== result', result) 
     except asyncio.CancelledError as e: 
      print("!!! cancel", e) 
     except Exception as e: 
      print("Exception in task, cancelling!") 
      for t in tasks: 
       t.cancel() 
      exc = e 
    if exc: 
     raise exc 


pool = pool_map(f, range(1, 20), 3) 

with closing(asyncio.get_event_loop()) as loop: 
    loop.run_until_complete(run_all(pool)) 
1

Hier ist eine naive Lösung, basierend auf der Tatsache, dass cancel ist ein no-op, wenn die Aufgabe bereits beendet ist:

async def run_all(awaitables): 
    futures = [asyncio.ensure_future(a) for a in awaitables] 
    try: 
     for fut in as_completed(futures): 
      result = await fut 
      print('got result', result) 
    except: 
     for future in futures: 
      future.cancel() 
     await asyncio.wait(futures) 
Verwandte Themen