2016-01-10 20 views
12

Ich schreibe ein Tool, das eine Verbindung zu X-Anzahl von UNIX-Sockets herstellt, einen Befehl sendet und die Ausgabe im lokalen Dateisystem speichert. Es läuft das alle X Sekunden. Um eine gewisse Bereinigung durchzuführen, wenn das Werkzeug Beendigungssignale empfängt, registriere ich eine Funktion (Abschaltung), um Signale zu signalisieren.SIGHUP und Signal.SIGTERM. Diese Funktion bricht alle Aufgaben ab und schließt dann die Ereignisschleife.Ordnungsgemäßes Herunterfahren von asyncio-Tasks

Mein Problem ist, dass ich

RuntimeError: Event loop stopped before Future completed

, wenn ich signal.SIGTERM (kill 'pid') senden. Ich habe die Dokumentation über das Abbrechen von Aufgaben zweimal gelesen, aber ich habe nicht erkannt, was ich hier falsch mache.

Ich bemerkte auch etwas komisches, wenn ich das Beendigungssignal sendete das Programm ist im Schlafmodus und ich sehe im Protokoll, dass es aufweckt die pull_stats() -Coroutine, können Sie dies in den ersten 2 Zeilen des Protokolls sehen .

Log:

21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 
21:53:45,857 [23031] [MainThread:pull_stats ] INFO  pull statistics 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,859 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  received stop signal, cancelling tasks... 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  stopping event loop 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  bye, exiting... 
Traceback (most recent call last): 
    File "./pull.py", line 249, in <module> 
    main() 
    File "./pull.py", line 245, in main 
    supervisor(loop, config) 
    File "./pull.py", line 161, in supervisor 
    config['pull']['socket-dir'], storage_dir, loop)) 
    File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 
RuntimeError: Event loop stopped before Future completed. 

Hier ist der Code:

def shutdown(loop): 
    LOGGER.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     LOGGER.info(task.cancel()) 
    LOGGER.info('stopping event loop') 
    loop.stop() 
    LOGGER.info('bye, exiting...') 


def write_file(filename, data): 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     return False 
    else: 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop): 
    connect = asyncio.open_unix_connection(socket_file) 
    reader, writer = yield from asyncio.wait_for(connect, 1) 

    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1] 
    filename = os.path.join(storage_dir, filename) 
    result = yield from loop.run_in_executor(None, write_file, filename, data) 

    return result 


@asyncio.coroutine 
def pull_stats(socket_dir, storage_dir, loop): 
    socket_files = glob.glob(socket_dir + '/*sock*') 
    coroutines = [get(socket_file, cmd, storage_dir, loop) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    status = yield from asyncio.gather(*coroutines) 

    if len(set(status)) == 1 and True in set(status): 
     return True 
    else: 
     return False 


def supervisor(loop, config): 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 

    while True: 
     start_time = int(time.time()) 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to create directory {d}:{e}".format(d=storage_dir, 
                   e=exc) 
      LOGGER.critical(msg) 

     # Launch all connections. 
     result = loop.run_until_complete(pull_stats(
      config['pull']['socket-dir'], storage_dir, loop)) 

     if result: 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       LOGGER.critical("failed to move %s to %s: %s", storage_dir, 
           dst_dir, exc) 
       break 
      else: 
       LOGGER.info('statistics are saved in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      LOGGER.critical('failed to pull stats') 
      shutil.rmtree(storage_dir) 

     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      time.sleep(sleep) 
    loop.close() 
    sys.exit(1) 


def main(): 
    args = docopt(__doc__, version=VERSION) 
    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    config.read(args['--file']) 

    loop = asyncio.get_event_loop() 

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) 
    LOGGER.setLevel(num_level) 

    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 

Antwort

6

Die Kündigung nicht sofort und erfordert läuft ioloop mit Ausnahme CancelledError gelöst werden. Entfernen Sie ioloop.stop aus dem Herunterfahren und behandeln Ausnahme in Supervisor, um die Dinge funktionieren zu lassen. Unten vereinfachtes Beispiel.

Wichtig ist, aber Sie können Task abbrechen, es hört nur auf/warten auf Ende/Ergebnisse und Schleife wird nicht weitere Ereignisse dafür behandeln. Aber die untere Anfrage/Pipe wird nicht gestoppt.

Vereinfachtes Beispiel:

import asyncio 
import functools 
import logging 
import signal 
import sys 
from concurrent.futures import CancelledError 


def shutdown(loop): 
    logging.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     task.cancel() 
    logging.info('bye, exiting in a minute...')  


@asyncio.coroutine 
def get(i): 
    logging.info('sleep for %d', i) 
    yield from asyncio.sleep(i)  


@asyncio.coroutine 
def pull_stats(): 
    coroutines = [get(i) for i in range(10,20)] 
    status = yield from asyncio.gather(*coroutines) 


def supervisor(loop): 
    try: 
     while True: 
      result = loop.run_until_complete(pull_stats()) 
    except CancelledError: 
     logging.info('CancelledError') 
    loop.close() 
    sys.exit(1) 


def main(): 
    logging.getLogger().setLevel(logging.INFO) 
    loop = asyncio.get_event_loop() 
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 
    supervisor(loop) 


if __name__ == '__main__': 
    main() 

Beachten Sie, dass, wenn Sie nur gather's Zukunft stornieren, werden alle Kinder sowie storniert eingestellt werden.

Und der Schlaf, was

Jeder Empfang eines Signals oder Unterbrechung bewirkt, dass die Programmausführung fortzusetzen. Wenn also der Prozess SIGTERM und den Handler empfangen hat, können Sie mit Python umgehen, um diesen Thread wieder aufzunehmen und sighandler aufzurufen. Aufgrund der Implementierung von ioloop und seiner Signalverarbeitung läuft es nach dem Aufwecken weiter.

+0

ich den Code geändert, wie Sie vorgeschlagen und es fängt die Ausnahme, aber ich sehe immer noch, dass pull_stats() aufgeweckt werden, wenn ich das TERM-Signal senden. In Ihrem Codebeispiel sehe ich das nicht. Ich verstehe deine Aussage über den Schlaf nicht ganz. Schlägst du vor, dass der Schlaf verhindert, dass der Faden gestoppt wird? Außerdem, wie bringe ich die Annullierung in allen Koroutinen voran, damit ich Schritte dort aufräumen kann? Vielen Dank @kwarunek für Ihre Antwort und Ihre Zeit, um ein Codebeispiel zu bieten, sehr geschätzt –

+1

Ich habe ein bisschen über SIGTERM bearbeitet, auch ist es nicht in Beispiel behandelt. – kwarunek

+0

@kwarunke, macht es jetzt Sinn. Wenn die Cancel-Funktion gesendet wird, wird eine Task in der letzten Yield-Zeile fortgesetzt, in der die Coroutine gerade angehalten ist. In meinem Fall bin ich in der Schlaflinie, das Signal kommt herein, der Hauptfaden erwacht aus dem Schlaf, während True alle Futures startet, die dann die Annullierung erhalten, aber coroutines pull_stats und erwachen, aber nicht fortfahren, da sie annulliert werden. Ich versuche immer noch, einen Weg zu finden, den Abbruch abzubrechen, wenn sich das Programm in der Verbindungs-/Empfangs-/Schreibphase befindet, da ich etwas sauber machen möchte. Nochmals vielen Dank für Ihre Hilfe. –

0

Update: Code funktioniert wie erwartet auf Python 3.4.4, siehe meinen Kommentar unten. @kwarunek, als du auf deinem letzten Kommentar erwähnt hast, dass ioloop weiter läuft, habe ich es nicht ganz verstanden, da mein Code funktioniert hat. Wenn der Prozess abgebrochen wird, wird eine Löschung an alle Aufgaben gesendet, die aufwachen. Aber, jetzt sehe ich deinen Punkt, weil das Abbrechen der Aufgaben nicht mit 3.4.4 ausgelöst wird, mit 3.4.2 ist in Ordnung.

21:28:09,004 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:28:11,826 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:11,827 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:11,828 [59441] [MainThread:shutdown] INFO  received stop signal 
21:28:11,828 [59441] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /opt/blue-python/3.4/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:28:11,829 [59441] [MainThread:shutdown] INFO  cancelling task 
21:28:11,829 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
21:28:11,829 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
21:28:21,009 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:21,010 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:21,011 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
2016-01-30 21:28:21,011 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 

während in Python 3.4.2

21:23:51,015 [10219] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:23:55,737 [10219] [MainThread:supervisor] INFO  starting while loop 
21:23:55,737 [10219] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:23:55,740 [10219] [MainThread:shutdown] INFO  received stop signal 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,740 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(7)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(4)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(5)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(6)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<pull_stats() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:150> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[_raise_stop_error() at /usr/lib/python3.4/asyncio/base_events.py:101]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(2)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,753 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,754 [10219] [MainThread:supervisor] INFO  Received CancelledError exception 
21:23:55,754 [10219] [MainThread:supervisor] INFO  waiting for threads to finish any pending IO tasks 
21:23:55,754 [10219] [MainThread:supervisor] INFO  closing our asyncio loop 
21:23:55,755 [10219] [MainThread:supervisor] INFO  exiting with status 0 

Der Hauptunterschied ist, wenn shutdown() sendet die Löschung gibt es keine Aufgaben aufgeweckt und als Ergebnis wird die While-Schleife durch die nicht angehalten wird, try catch block, der die Löschung behandelt. Wie löse ich das jetzt ?!

hier ist der Code

def shutdown(): 
    """Performs a clean shutdown""" 
    log.info('received stop signal') 
    for task in asyncio.Task.all_tasks(): 
     log.info(task) 
     log.info('cancelling task') 
     task.cancel() 


def write_file(filename, data): 
    """Writes data to a file. 

    Returns: 
     True if succeeds False otherwise. 
    """ 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     log.critical('failed to write data %s', exc) 
     return False 
    else: 
     log.debug('data saved in %s', filename) 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop, executor, timeout): 
    """Fetches data from a UNIX socket. 

    Sends a command to HAProxy over UNIX socket, reads the response and then 
    offloads the writing of the received data to a thread, so we don't block 
    this coroutine. 

    Arguments: 
     socket_file (str): The full path of the UNIX socket file to connect to. 
     cmd (str): The command to send. 
     storage_dir (str): The full path of the directory to save the response. 
     loop (obj): A base event loop from asyncio module. 
     executor (obj): A Threader executor to execute calls asynchronously. 
     timeout (int): Timeout for the connection to the socket. 

    Returns: 
     True if statistics from a UNIX sockets are save False otherwise. 
    """ 
    # try to connect to the UNIX socket 
    connect = asyncio.open_unix_connection(socket_file) 
    log.debug('connecting to UNIX socket %s', socket_file) 
    try: 
     reader, writer = yield from asyncio.wait_for(connect, timeout) 
    except (ConnectionRefusedError, PermissionError, OSError) as exc: 
     log.critical(exc) 
     return False 
    else: 
     log.debug('connection established to UNIX socket %s', socket_file) 

    log.debug('sending command "%s" to UNIX socket %s', cmd, socket_file) 
    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    if len(data) == 0: 
     log.critical('received zero data') 
     return False 

    log.debug('received data from UNIX socket %s', socket_file) 

    suffix = CMD_SUFFIX_MAP.get(cmd.split()[1]) 
    filename = os.path.basename(socket_file) + suffix 
    filename = os.path.join(storage_dir, filename) 
    log.debug('going to save data to %s', filename) 
    # Offload the writing to a thread so we don't block ourselves. 
    result = yield from loop.run_in_executor(executor, 
              write_file, 
              filename, 
              data) 

    return result 


@asyncio.coroutine 
def pull_stats(config, storage_dir, loop, executor): 
    """Launches coroutines for pulling statistics from UNIX sockets. 

    This a delegating routine. 

    Arguments: 
     config (obj): A configParser object which holds configuration. 
     storage_dir (str): The absolute directory path to save the statistics. 
     loop (obj): A base event loop. 
     executor(obj): A ThreadPoolExecutor object. 

    Returns: 
     True if statistics from *all* UNIX sockets are fetched False otherwise. 
    """ 
    # absolute directory path which contains UNIX socket files. 
    socket_dir = config.get('pull', 'socket-dir') 
    timeout = config.getint('pull', 'timeout') 
    socket_files = [f for f in glob.glob(socket_dir + '/*') 
        if is_unix_socket(f)] 

    log.debug('pull statistics') 
    coroutines = [get(socket_file, cmd, storage_dir, loop, executor, timeout) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    # Launch all connections. 
    status = yield from asyncio.gather(*coroutines) 

    return len(set(status)) == 1 and True in set(status) 


def supervisor(loop, config): 
    """Coordinates the pulling of HAProxy statistics from UNIX sockets. 

    This is the client routine which launches requests to all HAProxy 
    UNIX sockets for retrieving statistics and save them to file-system. 
    It runs indefinitely until main program is terminated. 

    Arguments: 
     loop (obj): A base event loop from asyncio module. 
     config (obj): A configParser object which holds configuration. 
    """ 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 
    executor = ThreadPoolExecutor(max_workers=config.getint('pull', 'workers')) 
    exit_code = 1 

    while True: 
     log.info('starting while loop') 
     start_time = int(time.time()) 
     # HAProxy statistics are stored in a directory and we use retrieval 
     # time(seconds since the Epoch) as a name of the directory. 
     # We first store them in a temporary place until we receive statistics 
     # from all UNIX sockets. 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     # If our storage directory can't be created we can't do much, thus 
     # abort main program. 
     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to make directory {d}:{e}".format(d=storage_dir, 
                  e=exc) 
      log.critical(msg) 
      log.critical('a fatal error has occurred, exiting..') 
      break 

     try: 
      log.info('launch the delegating coroutine') 
      result = loop.run_until_complete(pull_stats(config, storage_dir, 
                 loop, executor)) 
      log.info('delegating coroutine finished') 
     except asyncio.CancelledError: 
      log.info('Received CancelledError exception') 
      exit_code = 0 
      break 

     # if and only if we received statistics from all sockets then move 
     # statistics to the permanent directory. 
     # NOTE: when temporary and permanent storage directory are on the same 
     # file-system the move is actual a rename, which is an atomic 
     # operation. 
     if result: 
      log.debug('move %s to %s', storage_dir, dst_dir) 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       log.critical("failed to move %s to %s: %s", storage_dir, 
          dst_dir, exc) 
       log.critical('a fatal error has occurred, exiting..') 
       break 
      else: 
       log.info('statistics are stored in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      log.critical('failed to pull stats') 
      log.debug('removing temporary directory %s', storage_dir) 
      shutil.rmtree(storage_dir) 

     # calculate sleep time which is interval minus elapsed time. 
     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      log.debug('sleeping for %.3fs secs', sleep) 
      time.sleep(sleep) 

    # It is very unlikely that threads haven't finished their job by now, but 
    # they perform disk IO operations which can take some time in certain 
    # situations, thus we want to wait for them in order to perform a clean 
    # shutdown. 
    log.info('waiting for threads to finish any pending IO tasks') 
    executor.shutdown(wait=True) 
    log.info('closing our asyncio loop') 
    loop.close() 
    log.info('exiting with status %s', exit_code) 
    sys.exit(exit_code) 


def main(): 
    """Parses CLI arguments and launches main program.""" 
    args = docopt(__doc__, version=VERSION) 

    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    # Set defaults for all sections 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    # Load configuration from a file. NOTE: ConfigParser doesn't warn if user 
    # sets a filename which doesn't exist, in this case defaults will be used. 
    config.read(args['--file']) 

    if args['--print']: 
     for section in sorted(DEFAULT_OPTIONS): 
      print("[{}]".format(section)) 
      for key, value in sorted(DEFAULT_OPTIONS[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 
    if args['--print-conf']: 
     for section in sorted(config): 
      print("[{}]".format(section)) 
      for key, value in sorted(config[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 

    log.setLevel(getattr(logging, config.get('pull', 'loglevel').upper(), 
         None)) 
    # Setup our event loop 
    loop = asyncio.get_event_loop() 

    # Register shutdown to signals 
    loop.add_signal_handler(signal.SIGHUP, shutdown) 
    loop.add_signal_handler(signal.SIGTERM, shutdown) 

    # a temporary directory to store fetched data 
    tmp_dst_dir = config['pull']['tmp-dst-dir'] 
    # a permanent directory to move data from the temporary directory. Data are 
    # picked up by the process daemon from that directory. 
    dst_dir = config['pull']['dst-dir'] 
    for directory in dst_dir, tmp_dst_dir: 
     try: 
      os.makedirs(directory) 
     except OSError as exc: 
      # errno 17 => file exists 
      if exc.errno != 17: 
       sys.exit("failed to make directory {d}:{e}".format(d=directory, 
                    e=exc)) 
    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 
+0

Das Problem wurde gefunden. Auf dem System, auf dem ich Python 3.4.4 verwendet habe, plant die Coroutine pull_stats die get-Coroutinen nicht, da die socket_files-Liste leer ist. Das erklärt die Nachricht [Mainthread: shutdown] INFO result = false> da Aufgabe Löschung abgeschlossen ist nicht stattfindet und als Folge davon das try-catch erhält nie die Ausnahme den Ausgang des Programms zu verursachen. Auf einer anderen Box mit 3.4.4, wo die socket_files-Liste * nicht * leer ist, funktioniert die Löschung –

Verwandte Themen