2016-09-08 7 views
2

Ich versuche, asyncio verwenden, um gleichzeitige Netzwerk-I/O zu behandeln. Eine sehr große Anzahl von Funktionen ist an einem einzelnen Punkt zu planen, die sich stark in der Zeit unterscheiden, die für jeden einzelnen Vorgang benötigt wird. Empfangene Daten werden dann in einem separaten Prozess für jede Ausgabe verarbeitet.warten auf jede zukünftige asyncio

Die Reihenfolge, in der die Daten verarbeitet werden, ist nicht relevant, deshalb würde ich angesichts der möglicherweise sehr langen Warteperiode für die Ausgabe await für das, was Zukunft zuerst beendet, anstelle einer vordefinierten Reihenfolge.

def fetch(x): 
    sleep() 

async def main(): 
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)] 
    for f in futures: 
     await f 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main()) 

Normalerweise, bei dem um wartet Futures wurden der Warteschlange ist in Ordnung:

Well behaved functions profiler graph

blaue Farbe repräsentiert Zeit jede Aufgabe in Testamentsvollstrecker der Warteschlange, dh run_in_executor, genannt worden ist, aber die Funktion war noch nicht ausgeführt, da der Executor nur 5 Tasks gleichzeitig ausführt; grün ist die Zeit, die für die Ausführung der Funktion benötigt wird; und die rote Zeit ist die Zeit, die darauf verwendet wird, auf alle vorherigen Futures auf await zu warten.

Volatile functions profiler graph

In meinem Fall, in das Funktion stark in der Zeit variiert, gibt es eine Menge Zeit für die vorherige Futures in der Warteschlange auf der Warte verloren zu erwarten, während ich lokal GET Ausgabe Verarbeitung sein könnte. Dies führt dazu, dass mein System für einige Zeit untätig bleibt, um dann überfordert zu werden, wenn mehrere Ausgänge gleichzeitig ausgeführt werden, und dann zurück in den Leerlauf zu springen und auf weitere Anforderungen zu warten.

Gibt es einen Weg zu await welche Zukunft wird zuerst im Executor abgeschlossen?

+0

Was haben Sie verwendet, um die Ausführung der Coroutinen zu visualisieren? :) – PovilasB

+0

@PovilasB Viele Protokollierung 'time.time()' und PIL – Mirac7

+0

Wenn Sie Futures verwenden, fand ich [as_completed] (https://docs.python.org/3/library/concurrent.futures.html # concurrent.futures.as_completed) ist sehr hilfreich, um Ereignisse zu verarbeiten, wenn sie fertig sind. –

Antwort

3

Sieht so aus, als ob Sie nach asyncio.wait mit return_when=asyncio.FIRST_COMPLETED suchen.

def fetch(x): 
    sleep() 

async def main(): 
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)] 
    while futures: 
     done, futures = await asyncio.wait(futures, 
      loop=loop, return_when=asyncio.FIRST_COMPLETED) 
     for f in done: 
      await f 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main())