4

Ich hoffe, jemand kann mir hier helfen.Python 3.5 asyncio Coroutine auf Ereignisschleife aus synchronem Code in anderen Thread ausführen

Ich habe ein Objekt, das über die Fähigkeit verfügt, Attribute zurückzugeben, die Coroutine-Objekte zurückgeben. Das funktioniert wunderbar, aber ich habe eine Situation, in der ich die Ergebnisse des Coroutine-Objekts aus synchronem Code in einem separaten Thread abrufen muss, während die Ereignisschleife gerade ausgeführt wird. Der Code, den ich herauskommen, ist:

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any: 
    """ 
    Get an attribute synchronously and safely. 

    Note: 
     This does nothing special if an attribute is synchronous. It only 
     really has a use for asynchronous attributes. It processes 
     asynchronous attributes synchronously, blocking everything until 
     the attribute is processed. This helps when running SQL code that 
     cannot run asynchronously in coroutines. 

    Args: 
     key (str): The Config object's attribute name, as a string. 
     default (Any): The value to use if the Config object does not have 
      the given attribute. Defaults to None. 

    Returns: 
     Any: The vale of the Config object's attribute, or the default 
     value if the Config object does not have the given attribute. 
    """ 
    ret = self.get(key, default) 

    if asyncio.iscoroutine(ret): 
     if loop.is_running(): 
      loop2 = asyncio.new_event_loop() 
      try: 
       ret = loop2.run_until_complete(ret) 

      finally: 
       loop2.close() 
     else: 
      ret = loop.run_until_complete(ret) 

    return ret 

Was ich suche eine sichere Art und Weise ist in einer Multithread-Umgebung synchron die Ergebnisse eines Koroutine Objekts zu erhalten. self.get() kann ein Coroutine-Objekt zurückgeben, für Attribute, die ich eingestellt habe, um sie zur Verfügung zu stellen. Die Probleme, die ich gefunden habe, sind: Wenn die Ereignisschleife läuft oder nicht. Nach ein paar Stunden auf Stack-Überlauf und ein paar anderen Websites suchen, ist meine (gebrochene) Lösung oben. Wenn die Schleife ausgeführt wird, mache ich eine neue Ereignisschleife und führe meine Coroutine in der neuen Ereignisschleife aus. Dies funktioniert, außer dass der Code für immer auf der ret = loop2.run_until_complete(ret) Zeile hängt.

Gerade jetzt, ich habe die folgenden Szenarien mit den Ergebnissen:

  1. Ergebnisse von self.get() ist kein Koroutine
    • Gibt Ergebnisse. [Good]
  2. Ergebnisse self.get() ist eine Koroutine & Ereignisschleife nicht (im Allgemeinen als die Ereignisschleife in demselben Thread) läuft
    • Returns Ergebnisse. [Good]
  3. Ergebnisse self.get() ist ein Koroutine & Ereignisschleife (im Grunde in einem anderen Thread als der Ereignisschleife) ausgeführt wird
    • Hangs ewig warten auf Ergebnisse. [Bad]

Wer weiß, wie ich über die Festsetzung der schlechten Ergebnis gehen kann, so kann ich den Wert bekommen die ich brauche? Vielen Dank.

Ich hoffe, ich habe hier einen Sinn gemacht.

Ich habe einen guten und gültigen Grund, Threads zu verwenden; speziell verwende ich SQLAlchemy, das nicht async ist, und ich stuple den SQLAlchemy-Code zu einem ThreadPoolExecutor, um es sicher zu handhaben. Ich muss jedoch in der Lage sein, diese asynchronen Attribute innerhalb dieser Threads für den SQLAlchemy-Code abzufragen, um bestimmte Konfigurationswerte sicher zu erhalten. Und nein, ich werde nicht von SQLAlchemy zu einem anderen System wechseln, nur um das zu erreichen, was ich brauche, also biete keine Alternativen an. Das Projekt ist zu weit entfernt, um etwas so Grundlegendes zu ändern.

Ich versuchte mit asyncio.run_coroutine_threadsafe() und loop.call_soon_threadsafe() und beide fehlgeschlagen. Bis jetzt ist das am weitesten gekommen, um es zum Laufen zu bringen, ich habe das Gefühl, dass ich gerade etwas Offensichtliches vermisse.

Wenn ich eine Chance bekomme, werde ich einen Code schreiben, der ein Beispiel für das Problem bietet.

Ok, ich habe einen Beispielfall implementiert, und es hat so funktioniert, wie ich es erwarten würde. Also ist es wahrscheinlich, dass mein Problem anderswo im Code ist. Ich lasse das offen und werde die Frage ändern, um zu meinem wirklichen Problem zu passen, wenn ich es brauche.

Hat jemand irgendwelche mögliche Ideen, warum ein concurrent.futures.Future von asyncio.run_coroutine_threadsafe() für immer hängen würde, anstatt ein Ergebnis zurückzugeben?

Mein Beispiel-Code, dass nicht mein Fehler dupliziert leider unter:

import asyncio 
import typing 

loop = asyncio.get_event_loop() 

class ConfigSimpleAttr: 
    __slots__ = ('value', '_is_async') 

    def __init__(
     self, 
     value: typing.Any, 
     is_async: bool=False 
    ): 
     self.value = value 
     self._is_async = is_async 

    async def _get_async(self): 
     return self.value 

    def __get__(self, inst, cls): 
     if self._is_async and loop.is_running(): 
      return self._get_async() 
     else: 
      return self.value 

class BaseConfig: 
    __slots__ =() 

    attr1 = ConfigSimpleAttr(10, True) 
    attr2 = ConfigSimpleAttr(20, True)  

    def get(self, key: str, default: typing.Any=None) -> typing.Any: 
     return getattr(self, key, default) 

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any: 
     ret = self.get(key, default) 

     if asyncio.iscoroutine(ret): 
      if loop.is_running(): 
       fut = asyncio.run_coroutine_threadsafe(ret, loop) 
       print(fut, fut.running()) 
       ret = fut.result() 
      else: 
       ret = loop.run_until_complete(ret) 

     return ret 

config = BaseConfig() 

def example_func(): 
    return config.get_sync('attr1') 

async def main(): 
    a1 = await loop.run_in_executor(None, example_func) 
    a2 = await config.attr2 
    val = a1 + a2 
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val)) 
    return val 

loop.run_until_complete(main()) 

Dies ist die abgespeckte ist Version genau das, was mein Code tut, und das Beispiel funktioniert, auch wenn meine tatsächliche Anwendung nicht. Ich bin so weit fest, wo ich nach Antworten suchen kann. Vorschläge sind willkommen, um herauszufinden, wo ich mein Problem "steckte für immer" aufspüren kann, auch wenn mein Code oben das Problem nicht wirklich dupliziert.

Antwort

0

Ok, ich habe meinen Code funktioniert, indem ich einen anderen Ansatz gewählt habe. Das Problem war mit der Verwendung von etwas verbunden, das Datei-IO hatte, das ich in eine Coroutine umwandelte, die loop.run_in_executor() für die Datei-IO-Komponenten verwendete. Dann versuchte ich, dies in einer sync-Funktion zu verwenden, die von einem anderen Thread aufgerufen wurde, der mit einem anderen loop.run_in_executor() für diese Funktion verarbeitet wurde. Dies ist eine sehr wichtige Routine in meinem Code (während der Ausführung meines Kurzcode-Codes wahrscheinlich millionenfach oder öfter genannt), und ich traf die Entscheidung, dass meine Logik einfach zu kompliziert wurde. Also ... ich habe es unkompliziert gemacht. Wenn ich nun die Datei-IO-Komponenten asynchron verwenden möchte, verwende ich explizit meine "get_async()" -Methode, ansonsten verwende ich mein Attribut über normalen Attributzugriff.

Durch die Beseitigung der Komplexität meiner Logik, machte es den Code sauberer, einfacher zu verstehen, und noch wichtiger, es funktioniert tatsächlich. Ich bin zwar nicht 100% sicher, dass ich die Ursache des Problems kenne (ich glaube, es hat etwas damit zu tun, dass ein Thread ein Attribut verarbeitet, das dann wiederum einen anderen Thread startet, der versucht, das Attribut zu lesen, bevor es verarbeitet wird verursachte etwas wie eine Race Condition und stoppte meinen Code, aber ich konnte den Fehler nie außerhalb meiner Anwendung kopieren, leider um es komplett zu beweisen), ich konnte es hinter mir lassen und meine Entwicklungsbemühungen fortsetzen.

1

Es ist sehr unwahrscheinlich, dass Sie gleichzeitig mehrere Ereignisschleifen ausgeführt werden müssen, so dass dieser Teil sieht ganz falsch:

if loop.is_running(): 
     loop2 = asyncio.new_event_loop() 
     try: 
      ret = loop2.run_until_complete(ret) 

     finally: 
      loop2.close() 
    else: 
     ret = loop.run_until_complete(ret) 

Selbst testen, ob die Schleife läuft oder nicht nicht zu sein scheint der richtige Ansatz. Es ist wahrscheinlich besser geben explizit die (nur) Laufschleife get_sync und planen Sie den Koroutine mit run_coroutine_threadsafe:

def get_sync(self, key, loop): 
    ret = self.get(key, default) 
    if not asyncio.iscoroutine(ret): 
     return ret 
    future = asyncio.run_coroutine_threadsafe(ret, loop) 
    return future.result() 

EDIT: Hanging Probleme zu Aufgaben in Beziehung gesetzt werden kann in der falschen Schleife geplant werden (zB zu vergessen das optionale Argument loop beim Aufruf einer Coroutine). Diese Art von Problem sollte einfacher mit dem PR 303 (jetzt zusammengeführt) zu debuggen sein: ein RuntimeError wird stattdessen ausgelöst, wenn die Schleife und die Zukunft nicht übereinstimmen. Vielleicht möchten Sie Ihre Tests mit der neuesten Version von asyncio ausführen.

+0

Ja, darauf geschaltet, genau das gleiche Problem zu bekommen. Ich habe vor, es jetzt etwas anders zu machen. Ich denke, dass ich das Problem besser herausgefunden habe und hoffe, ein besseres Beispiel zu haben, wenn ich den Fehler replizieren kann, wäre es großartig. Das Problem ist, dass etwas hunderte Male ausgeführt wird, und es dauert eine Weile, bis es stecken bleibt. –

+0

@CliffHill Siehe, ob meine Änderung hilft. – Vincent

+0

Danke, aber Sie hatten am Ende eine Wiederherstellung meines ursprünglichen Codes, wo ich das Problem hatte. Zumindest habe ich eine Bestätigung, dass meine ursprüngliche Idee auf dem richtigen Weg war. –

Verwandte Themen