2017-03-16 5 views
1

ordnungsgemäß beenden Ich habe vorher how to run several autobahn.ApplicationSession instances from within the same python process gefragt, ohne sie zu blockieren.Autobahn ApplicationRunner() .run() mit signal.SIGINT

Das Problem wurde behoben, aber ich habe ein neues Problem festgestellt.

Die Beendigung dieser mp.Process Instanzen erweist sich als schwierig. Ich weiß, dass der Code in ApplicationRunner.run() Exits auf KeyboardInterrupt, aber ich konnte nicht erhalten, um dies richtig auszulösen.

Beispielcode:

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     def sig_handler(x, y): 
      pass 
     signal.signal(signal.SIGINT, sig_handler) 
     super(PlnxEndpoint, self).join(*args, **kwargs) 


class PoloniexWSS(WSSAPI): 
    def __init__(self, endpoints=None): 
     super(PoloniexWSS, self).__init__(None, 'Poloniex') 
     self.data_q = mp.Queue() 
     self.connections = {} 
     if endpoints: 
      self.endpoints = endpoints 
     else: 
      r = requests.get('https://poloniex.com/public?command=returnTicker') 
      self.endpoints = list(r.json().keys()) 
      self.endpoints.append('ticker') 

     for endpoint in self.endpoints: 
      self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q) 

    def start(self): 
     super(PoloniexWSS, self).start() 
     for conn in self.connections: 
      self.connections[conn].start() 

    def stop(self): 
     for conn in self.connections: 
      self.connections[conn].join() 
     super(PoloniexWSS, self).stop() 

Während diese self.q ausreichend füllt, habe ich noch einen Fehler erhalten, wenn meine Subprozesse gestoppt werden:

RuntimeError: Event loop stopped before Future completed. 
    Traceback (most recent call last): 
    File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap 
    self.run() 
    File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run 
    self.runner.run(PoloniexSession) 
    File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run 
    loop.run_until_complete(protocol._session.leave()) 
    File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 

Was mich führt zu glauben, dass meine signal.SIGINT nicht ausgelöst wird, wo Ich will es.

According to the source code of ApplicationRunner.run(), sollte ein SIGINT/KeyboardInterrupt würde in Würde enden in der serve_forever() Methode enden.

Manuelles Schließen der asyncio.event_loop Ergebnisse in der obigen Fehler auch:

class PlnxEndpoint(mp.Process): 
#... 
    def join(self, *args, **kwargs): 
     loop = get_event_loop() 
     loop.stop() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
#... 
+0

Wir sollten dies in der vorherigen Frage behandeln, schlage ich vor, diese Frage zu schließen. – stovfl

+0

hm, ich stimme nicht zu. Es ist ein separates Problem, nach allem. 'Running 2 ApplicationRunner(). Run() Methoden in separaten Prozessen" vs 'Schließen eines ApplicationRunner in einem separaten Prozess". Sie sind zweifellos verwandt, aber immer noch zwei verschiedene Probleme. – nlsdfnbch

Antwort

0

Afterall, ein wenig das Hantieren die recht einfache Lösung ergab:

Mit multiprocessing.Event(), ich anmutig war in der Lage zu beenden mein Prozess.

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     if self.config.extra['is_killed'].is_set(): 
      raise KeyboardInterrupt() 
     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 
     self.is_killed = mp.Event() 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q, 
              'is_killed': self.is_killed}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     self.is_killed.set() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
+0

Die 'def join()' sind nun wie empfohlen zur Verwendung zurückgekehrt. Habe aber noch kein Bild warum du den 'Prozess' in' join() 'stoppen willst. – stovfl