ordnungsgemäß beenden Ich habe vorher how to run several autobahn.ApplicationSession
instances from within the same python process gefragt, ohne sie zu blockieren.Autobahn ApplicationRunner() .run() mit signal.SIGINT
Das Problem wurde behoben, aber ich habe ein neues Problem festgestellt.
Die Beendigung dieser mp.Process
Instanzen erweist sich als schwierig. Ich weiß, dass der Code in ApplicationRunner.run()
Exits auf KeyboardInterrupt
, aber ich konnte nicht erhalten, um dies richtig auszulösen.
Beispielcode:
class PoloniexSession(ApplicationSession):
@coroutine
def onJoin(self, *args, **kwargs):
channel = self.config.extra['channel']
def onTicker(*args, **kwargs):
self.config.extra['queue'].put((channel, (args, kwargs, time.time())))
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
raise
class PlnxEndpoint(mp.Process):
def __init__(self, endpoint, q, **kwargs):
super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
endpoint, **kwargs)
self.endpoint = endpoint
self.q = q
def run(self):
self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
extra={'channel': self.endpoint,
'queue': self.q})
self.runner.run(PoloniexSession)
def join(self, *args, **kwargs):
def sig_handler(x, y):
pass
signal.signal(signal.SIGINT, sig_handler)
super(PlnxEndpoint, self).join(*args, **kwargs)
class PoloniexWSS(WSSAPI):
def __init__(self, endpoints=None):
super(PoloniexWSS, self).__init__(None, 'Poloniex')
self.data_q = mp.Queue()
self.connections = {}
if endpoints:
self.endpoints = endpoints
else:
r = requests.get('https://poloniex.com/public?command=returnTicker')
self.endpoints = list(r.json().keys())
self.endpoints.append('ticker')
for endpoint in self.endpoints:
self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q)
def start(self):
super(PoloniexWSS, self).start()
for conn in self.connections:
self.connections[conn].start()
def stop(self):
for conn in self.connections:
self.connections[conn].join()
super(PoloniexWSS, self).stop()
Während diese self.q
ausreichend füllt, habe ich noch einen Fehler erhalten, wenn meine Subprozesse gestoppt werden:
RuntimeError: Event loop stopped before Future completed.
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap
self.run()
File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run
self.runner.run(PoloniexSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run
loop.run_until_complete(protocol._session.leave())
File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
Was mich führt zu glauben, dass meine signal.SIGINT
nicht ausgelöst wird, wo Ich will es.
According to the source code of ApplicationRunner.run()
, sollte ein SIGINT
/KeyboardInterrupt
würde in Würde enden in der serve_forever()
Methode enden.
Manuelles Schließen der asyncio.event_loop
Ergebnisse in der obigen Fehler auch:
class PlnxEndpoint(mp.Process):
#...
def join(self, *args, **kwargs):
loop = get_event_loop()
loop.stop()
super(PlnxEndpoint, self).join(*args, **kwargs)
#...
Wir sollten dies in der vorherigen Frage behandeln, schlage ich vor, diese Frage zu schließen. – stovfl
hm, ich stimme nicht zu. Es ist ein separates Problem, nach allem. 'Running 2 ApplicationRunner(). Run() Methoden in separaten Prozessen" vs 'Schließen eines ApplicationRunner in einem separaten Prozess". Sie sind zweifellos verwandt, aber immer noch zwei verschiedene Probleme. – nlsdfnbch