2017-10-16 2 views
3

Ich lese die Dokumentation für Protocol und Transport Klassen in asyncio Paket. Speziell:Wie erstellt man einen benutzerdefinierten Transport für asyncio?

Bei der Unterklasse einer Protokollklasse wird empfohlen, bestimmte Methoden zu überschreiben. Diese Methoden sind Callbacks: Sie werden bei bestimmten Ereignissen vom Transport aufgerufen (zum Beispiel wenn Daten empfangen werden); Sie sollten sie nicht selbst anrufen, , es sei denn, Sie implementieren einen Transport.

Hervorhebungen

Also, im Prinzip sollte es möglich sein, einen Transport zu implementieren, aber ...

Transports sind Klassen von asyncio, um verschiedene abstrakte bereitgestellt Arten von Kommunikationskanälen. Im Allgemeinen werden Sie einen Transport nicht selbst instanziieren; Stattdessen rufen Sie eine AbstractEventLoop-Methode(welche?), die den Transport erstellen und versuchen, den zugrunde liegenden Kommunikationskanal zu initiieren, ruft Sie zurück, wenn es erfolgreich ist.

Wieder Hervorhebungen

den Abschnitt über AbstractEventLoop Lesen entlang und über Ich sehe keine Möglichkeit, einen benutzerdefinierten Transport zu schaffen. Die nächste, die es kommt, ist in AbstractEventLoop.create_connection(protocol_factory, host=...), die nur bedeutet, dass es eine Art von Steckdose erstellen wird ...

Nun, mein ultimatives Ziel ist es, einen benutzerdefinierten Transport, der keine Art von Steckdose ist (vielleicht eine StringIO, vielleicht ein Datenbankverbindungscursor, vielleicht ein anderes Protokoll).


Also, das ist nur ein gut gemeinte, aber nie Fehler in der Dokumentation implementiert, oder gibt es tatsächlich eine Möglichkeit, individuellen Transport zu implementieren, ohne asyncio Affen-Patchen und Erstgeborenen zu opfern?

Antwort

4

ist ein bisschen Kesselblech Code benötigt, obwohl es nicht zu viel ist:

from asyncio import streams, transports, get_event_loop 


class CustomTransport(transports.Transport): 

    def __init__(self, loop, protocol, *args, **kwargs): 
     self._loop = loop 
     self._protocol = protocol 

    def get_protocol(self): 
     return self._protocol 

    def set_protocol(self, protocol): 
     return self._protocol 

    # Implement read/write methods 

    # [...] 


async def custom_create_connection(protocol_factory, *args, **kwargs): 
    loop = get_event_loop() 
    protocol = protocol_factory() 
    transport = CustomTransport(loop, protocol, *args, **kwargs) 
    return transport, protocol 


async def custom_open_connection(*args, **kwargs): 
    reader = streams.StreamReader() 
    protocol = streams.StreamReaderProtocol(reader) 
    factory = lambda: protocol 
    transport, _ = await custom_create_connection(factory, *args, **kwargs) 
    writer = streams.StreamWriter(transport, protocol, reader) 
    return reader, writer 
Verwandte Themen