2017-05-31 5 views
0

erstellen Ich versuche, zwei Koroutinen hinzufügen Schleife asyncio und erhalte eine Fehlermeldung:Python3 Asyncio eine sekundäre Verbindung von einem anfänglichen

RuntimeError: This event loop is already running 

Mein Ziel ist es, einen Server zu kommunizieren (dh ich habe keine Kontrolle über). Dieser Server erwartet eine erste Verbindung vom Client. Der Server stellte dann dem Client auf dieser Verbindung einen Port zur Verfügung. Der Client muss diesen Port verwenden, um eine zweite Verbindung zu erstellen. Diese zweite Verbindung wird vom Server verwendet, um unaufgeforderte Nachrichten an den Client zu senden. Die erste Verbindung bleibt weiterhin für andere Zweiwege-Kommunikationen bestehen.

Um dieses Szenario zu erstellen, habe ich einige Code, der den Fehler reproduziert:

class Connection(): 
    def __init__(self, ip, port, ioloop): 
     self.ip = ip 
     self.port = port 
     self.ioloop = ioloop 
     self.reader, self.writer = None, None 
     self.protocol = None 
     self.fileno = None 

    async def __aenter__(self): 
     # Applicable when doing 'with Connection(...' 
     log.info("Entering and Creating Connection") 
     self.reader, self.writer = (
      await asyncio.open_connection(self.ip, self.port, loop=self.ioloop) 
     ) 
     self.protocol = self.writer.transport.get_protocol() 
     self.fileno = self.writer.transport.get_extra_info('socket').fileno() 

     log.info(f"Created connection {self}") 
     return self 

    async def __aexit__(self, *args): 
     # Applicable when doing 'with Connection(...' 
     log.info(f"Exiting and Destroying Connection {self}") 
     if self.writer: 
      self.writer.close() 

    def __await__(self): 
     # Applicable when doing 'await Connection(...' 
     return self.__aenter__().__await__() 

    def __repr__(self): 
     return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]" 

    async def send_recv_message(self, message): 
     log.debug(f"send: '{message}'") 
     self.writer.write(message.encode()) 
     await self.writer.drain() 

     log.debug("awaiting data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv: '{data}'") 
     return data 


class ServerConnection(Connection): 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     self.ioloop.run_until_complete(event_connection.recv_message()) 

class EventConnection(Connection): 
    async def recv_message(self): 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv only: '{data}'") 
     return data 


async def main(loop): 
    client1 = await ServerConnection('127.0.0.1', 8888, loop) 
    await client1.setup_connection() 
    await client1.send_recv_message("Hello1") 
    await client1.send_recv_message("Hello2") 
    await asyncio.sleep(5) 

if __name__ == '__main__': 
    #logging.basicConfig(level=logging.INFO) 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 

Der Fehler tritt in ServerConnection.setup_connection() -Methode, wo run_until_complete aufgerufen wird.

Ich mache wahrscheinlich etwas wegen mangelndem Verständnis asyncio falsch. Grundsätzlich, wie richte ich eine sekundäre Verbindung ein, die während der Einrichtung der ersten Verbindung Ereignisbenachrichtigungen erhalten wird (unaufgefordert)?

Danke.

Followup

Da der Code sehr ähnlich ist (einige Änderungen, um es mehr Funktionen hinzuzufügen), ich hoffe, es ist nicht schlecht Etikette ist auf den ursprünglichen Beitrag followup als der resultierenden Fehler immer noch das gleiche ist.

Das neue Problem besteht darin, dass die recv_message die Methode "proced_data" aufruft, wenn sie die nicht angeforderte Nachricht empfängt (die von EventConnection empfangen wird). Ich möchte Process_data zu einer Zukunft machen, damit recv_message beendet wird (Ioloop sollte aufhören). Die secure_future würde es dann aufheben und weiter laufen, um ServerConnection zu verwenden, um eine Anfrage/Antwort an den Server zu machen. Bevor es das tut, muss es jedoch zu einem Benutzercode gehen (repräsentiert durch external_command() und von dem ich es vorziehen würde, den Async-Inhalt auszublenden). Dies würde es wieder synchron machen. Sobald sie das getan haben, was sie benötigen, sollten sie execute_command auf ServerConnection aufrufen, wodurch die Schleife erneut gestartet wird.

Das Problem ist, meine Erwartung für die Verwendung von ensure_future nicht aus, da es scheint, die Schleife nicht aufhören zu laufen. Wenn die Ausführung des Codes den Befehl execute_command erreicht, der run_until_complete ausführt, tritt eine Ausnahme mit dem Fehler "Diese Ereignisschleife wird bereits ausgeführt" auf.

Ich habe zwei Fragen:

  1. Wie kann ich es so machen, dass die ioloop stoppen kann nach process_data in ensure_future platziert wird, und anschließend in der Lage sein, es wieder in execute_command zu laufen?

  2. Sobald recv_message etwas erhalten hat, wie können wir es so machen, dass es mehr unerwünschte Daten erhalten kann? Ist es genug/sicher, einfach zu verwenden, um sich selbst noch einmal anzurufen?

Hier ist der Beispielcode, der dieses Problem simuliert.

client1 = None 

class ServerConnection(Connection): 
    connection_type = 'Server Connection' 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     asyncio.ensure_future(event_connection.recv_message()) 

    async def _execute_command(self, data): 
     return await self.send_recv_message(data) 

    def execute_command(self, data): 
     response_str = self.ioloop.run_until_complete(self._execute_command(data)) 
     print(f"exec cmd response_str: {response_str}") 

    def external_command(self, data): 
     self.execute_command(data) 


class EventConnection(Connection): 
    connection_type = 'Event Connection' 
    async def recv_message(self): 
     global client1 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv-only: '{data}'") 
     asyncio.ensure_future(self.process_data(data)) 
     asyncio.ensure_future(self.recv_message()) 

    async def process_data(self, data): 
     global client1 
     await client1.external_command(data) 


async def main(ioloop): 
    global client1 
    client1 = await ServerConnection('127.0.0.1', 8888, ioloop) 
    await client1.setup_connection() 
    print(f"after connection setup loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello1") 
    print(f"after Hello1 loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello2") 
    print(f"after Hello2 loop running is {ioloop.is_running()}") 
    while True: 
     print(f"inside while loop running is {ioloop.is_running()}") 
     t = 10 
     print(f"asyncio sleep {t} sec") 
     await asyncio.sleep(t) 


if __name__ == '__main__': 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 
+0

Sie wollen 2-Anschluss zur gleichen Zeit schaffen? Warum nicht 'ayncio.gather' verwenden? Mit dieser Methode können Sie 2 asynchrone Aktionen starten ... –

+0

asyncio.gather scheint aus zwei Gründen nicht auf meinen Fall zutreffen. Einer, der die Ergebnisse in aufgelisteter Reihenfolge sammeln soll und der andere, dass er alle Zukünfte in einer Liste zusammenzufassen scheint. In meinem Fall möchte ich es jetzt eine Zukunft und die zweite Zukunft (die zweite Verbindung) später geben, nachdem es den Hafen von der ersten Zukunft erhalten hat. – bhairav

Antwort

0

Versuchen ersetzen:

self.ioloop.run_until_complete 

Mit

await 
+0

Ich hatte das versucht, aber es würde warten auf EventConnection recv_message Methode warten. Ich ersetzte es tatsächlich mit asyncio.ensure_future und das schien den Trick getan zu haben. Allerdings habe ich noch eine weitere Komplexitätsebene, auf die ich wieder ein ähnliches Problem stieß (ich habe dieses Framework langsam so aufgebaut, wie ich es brauche). Fügt es als Update zur ursprünglichen Frage hinzu. – bhairav

Verwandte Themen