Ich versuche zu lernen, (idiomatisch) Python 3.4's asyncio
zu verwenden. Mein größter Stolperstein ist, wie Korotinen, die ständig Daten konsumieren, "verkettet" werden, der Status damit aktualisiert wird und dieser Zustand von einer anderen Koroutine verwendet werden kann.Wie verbinde ich asyncio.coroutines, die kontinuierlich Daten produzieren und konsumieren?
Das beobachtbare Verhalten, das ich von diesem Beispielprogramm erwarte, besteht einfach darin, periodisch über die Summe der von einem Subprozess empfangenen Zahlen zu berichten. Die Meldung sollte ungefähr mit der gleichen Rate erfolgen, mit der das Source
-Objekt Zahlen aus dem Subprozess erhält. Die IO-Blockierung in der Berichtsfunktion sollte das Lesen aus dem Subprozess nicht blockieren. Wenn die Berichtsfunktion länger als eine Iteration des Lesens von dem Unterprozess blockiert, ist es mir egal, ob sie überspringt oder einen Haufen auf einmal meldet; aber es sollte ungefähr so viele Iterationen von reporter()
geben, wie es von expect_exact()
über einen ausreichend langen Zeitrahmen gibt.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
Dieses Beispiel erfordert pexpect
von git installiert werden; Sie könnten nur ersetzen, wie leicht run()
mit:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
Aber die wirklichen subprocess ich in Bedürfnissen interessiert bin in einem pty
ausgeführt werden, was meiner Meinung nach bedeutet der mitgelieferte subprocess Transport/Protokollrahmen in asyncio
nicht sein ausreichend dafür. Der Punkt ist, dass die Quelle der asynchronen Aktivität eine Coroutine ist, die mit yield from
verwendet werden kann.
Beachten Sie, dass die reporter()
-Funktion in diesem Beispiel kein gültiger Code ist; Mein Problem ist, dass ich nicht weiß, was da drin sein soll. Idealerweise möchte ich den reporter()
Code von run()
getrennt halten; Der Sinn dieser Übung besteht darin, zu sehen, wie komplexere Programme mithilfe der Komponenten in asyncio
in kleinere Einheiten von Code zerlegt werden können.
Gibt es eine Möglichkeit, diese Art von Verhalten mit dem Modul asyncio
zu strukturieren?