2015-03-29 2 views
5

Ich versuche zu lernen, (idiomatisch) Python 3.4's asyncio zu verwenden. Mein größter Stolperstein ist, wie Korotinen, die ständig Daten konsumieren, "verkettet" werden, der Status damit aktualisiert wird und dieser Zustand von einer anderen Koroutine verwendet werden kann.Wie verbinde ich asyncio.coroutines, die kontinuierlich Daten produzieren und konsumieren?

Das beobachtbare Verhalten, das ich von diesem Beispielprogramm erwarte, besteht einfach darin, periodisch über die Summe der von einem Subprozess empfangenen Zahlen zu berichten. Die Meldung sollte ungefähr mit der gleichen Rate erfolgen, mit der das Source-Objekt Zahlen aus dem Subprozess erhält. Die IO-Blockierung in der Berichtsfunktion sollte das Lesen aus dem Subprozess nicht blockieren. Wenn die Berichtsfunktion länger als eine Iteration des Lesens von dem Unterprozess blockiert, ist es mir egal, ob sie überspringt oder einen Haufen auf einmal meldet; aber es sollte ungefähr so ​​viele Iterationen von reporter() geben, wie es von expect_exact() über einen ausreichend langen Zeitrahmen gibt.

#!/usr/bin/python3 
import asyncio 
import pexpect 

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     self.sum = 0 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      self.sum += int(p.before) 

     p.terminate() 

@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source # ??? 
     print("New sum is: {:d}".format(new_sum)) 
     # Potentially some other blocking operation 
     yield from limited_throughput.write(new_sum) 

def main(): 
    loop = asyncio.get_event_loop() 

    source = Source() 
    loop.call_later(1, source.start) 
    loop.call_later(11, source.stop) 

    # Again, not sure what goes here... 
    asyncio.async(reporter(source)) 

    loop.run_until_complete(source.run()) 
    loop.close() 

if __name__ == '__main__': 
    main() 

Dieses Beispiel erfordert pexpect von git installiert werden; Sie könnten nur ersetzen, wie leicht run() mit:

@asyncio.coroutine 
def run(self): 
    yield from self.flag.wait() 

    while self.flag.is_set(): 
     value = yield from asyncio.sleep(0.5, random.choice((-1, 1))) 
     self.sum += value 

Aber die wirklichen subprocess ich in Bedürfnissen interessiert bin in einem pty ausgeführt werden, was meiner Meinung nach bedeutet der mitgelieferte subprocess Transport/Protokollrahmen in asyncio nicht sein ausreichend dafür. Der Punkt ist, dass die Quelle der asynchronen Aktivität eine Coroutine ist, die mit yield from verwendet werden kann.

Beachten Sie, dass die reporter()-Funktion in diesem Beispiel kein gültiger Code ist; Mein Problem ist, dass ich nicht weiß, was da drin sein soll. Idealerweise möchte ich den reporter() Code von run() getrennt halten; Der Sinn dieser Übung besteht darin, zu sehen, wie komplexere Programme mithilfe der Komponenten in asyncio in kleinere Einheiten von Code zerlegt werden können.

Gibt es eine Möglichkeit, diese Art von Verhalten mit dem Modul asyncio zu strukturieren?

Antwort

5

Die Sperrgrundelemente und Warteschlangen in asyncio selbst bieten einige Mechanismen, um dies zu tun.

Bedingungen

Die asyncio.Condition() bietet eine Möglichkeit einer Bedingung mitzuteilen. Verwenden Sie dies, wenn es nicht wichtig ist, wenn Sie einige Ereignisse löschen.

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     self.sum = 0 

     # For consumers 
     self.ready = asyncio.Condition() 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      self.sum += int(p.before) 
      with (yield from self.ready): 
       self.ready.notify_all() # Or just notify() depending on situation 

     p.terminate() 

    @asyncio.coroutine 
    def read(self): 
     with (yield from self.ready): 
      yield from self.ready.wait() 
      return self.sum 


@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source.read() 
     print("New sum is: {:d}".format(new_sum)) 
     # Other potentially blocking stuff in here 

Queues

Die asyncio.Queue() können Sie Ihre Daten in eine Warteschlange gestellt (entweder LIFO oder FIFO) und haben etwas anderes von ihr lesen. Verwenden Sie dies, wenn Sie unbedingt auf jedes Ereignis reagieren möchten, selbst wenn Ihr Verbraucher (rechtzeitig) zurückkommt. Wenn Sie die Größe der Warteschlange begrenzen, wird Ihr Producer möglicherweise blockieren, wenn Ihr Consumer langsam genug ist.

Beachten Sie, dass wir damit auch sum in eine lokale Variable umwandeln können.

#!/usr/bin/python3 
import asyncio 
import pexpect 

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     # NOTE: self.sum removed! 

     # For consumers 
     self.output = asyncio.Queue() 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     sum = 0 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      sum += int(p.before) 
      yield from self.output.put(sum) 

     p.terminate() 

    @asyncio.coroutine 
    def read(self): 
     return (yield from self.output.get()) 

@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source.read() 
     print("New sum is: {:d}".format(new_sum)) 
     # Other potentially blocking stuff here 

Beachten Sie, dass Python 3.4.4 task_done() und join() Methoden zum Queue hinzufügen, damit Sie ordnungsgemäß verarbeiten, alles zu beenden, wenn Sie die Verbraucher wissen, beendet ist (soweit zutreffend).

Verwandte Themen