2016-07-31 8 views
2

Ich schreibe eine App auf dem asyncio-Framework basiert. Diese App interagiert mit einer API, die ein Ratenlimit hat (maximal 2 Aufrufe pro Sekunde). Also habe ich Methoden, die mit einer API interagieren, auf den Sellerie verschoben, um ihn als Ratenbegrenzer zu verwenden. Aber es sieht wie ein Overhead aus.Asyncio & rate limiting

Es gibt Möglichkeiten, um eine neue asyncio Ereignisschleife (oder etwas anderes) zu erstellen, die die Ausführung eines Coroutins nicht mehr als n pro Sekunde garantiert?

Antwort

2

Ich glaube, Sie sind in der Lage einen Zyklus wie folgt zu schreiben:

while True: 
    t0 = loop.time() 
    await make_io_call() 
    dt = loop.time() - t0 
    if dt < 0.5: 
     await asyncio.sleep(0.5 - dt, loop=loop) 
+0

Dank drucken! Auf diese Weise habe ich einen Dekorateur gemacht, während ich auf eine Antwort warte. Es scheint, als ob dies ein einziger richtiger Ansatz ist. Das ist wahr? –

+0

Was meinen Sie mit "single proper approach"? Für mich ist es der einfachste und offensichtlichste Weg, um das Problem zu lösen, aber ich kann ein Dutzend überkomplizierter Lösungen einladen. –

+0

Es ist genau das, was ich hören will :) Danke –

3

Die akzeptierte Antwort richtig ist. Beachten Sie jedoch, dass man normalerweise 2QPS so nah wie möglich erreichen möchte. Diese Methode bietet keine Parallelisierung, was ein Problem darstellen könnte, wenn make_io_call() länger als eine Sekunde zur Ausführung benötigt. Eine bessere Lösung wäre, einen Semaphor an make_io_call zu übergeben, mit dem er wissen kann, ob er mit der Ausführung beginnen kann oder nicht.

Hier ist eine solche Implementierung: RateLimitingSemaphore wird nur dann seinen Kontext freigeben, wenn das Ratenlimit unter die Anforderung fällt.

import asyncio 
from collections import deque 
from datetime import datetime 

class RateLimitingSemaphore: 
    def __init__(self, qps_limit, loop=None): 
     self.loop = loop or asyncio.get_event_loop() 
     self.qps_limit = qps_limit 

     # The number of calls that are queued up, waiting for their turn. 
     self.queued_calls = 0 

     # The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the 
     # last ~ second. Note that this also allows us to schedule the first N executions immediately. 
     self.call_times = deque() 

    async def __aenter__(self): 
     self.queued_calls += 1 
     while True: 
      cur_rate = 0 
      if len(self.call_times) == self.qps_limit: 
       cur_rate = len(self.call_times)/(self.loop.time() - self.call_times[0]) 
      if cur_rate < self.qps_limit: 
       break 
      interval = 1./self.qps_limit 
      elapsed_time = self.loop.time() - self.call_times[-1] 
      await asyncio.sleep(self.queued_calls * interval - elapsed_time) 
     self.queued_calls -= 1 

     if len(self.call_times) == self.qps_limit: 
      self.call_times.popleft() 
     self.call_times.append(self.loop.time()) 

    async def __aexit__(self, exc_type, exc, tb): 
     pass 


async def test(qps): 
    executions = 0 
    async def io_operation(semaphore): 
     async with semaphore: 
      nonlocal executions 
      executions += 1 

    semaphore = RateLimitingSemaphore(qps) 
    start = datetime.now() 
    await asyncio.wait([io_operation(semaphore) for i in range(5*qps)]) 
    dt = (datetime.now() - start).total_seconds() 
    print('Desired QPS:', qps, 'Achieved QPS:', executions/dt) 

if __name__ == "__main__": 
    asyncio.get_event_loop().run_until_complete(test(100)) 
    asyncio.get_event_loop().close() 

Wird Desired QPS: 100 Achieved QPS: 99.82723898022084