2017-03-16 6 views
0

Ich möchte Jupyter Kernel in der Seite der Sellerie Arbeiter verwenden. Für jeden Sellerie-Arbeiter wird es einen Jupyter-Kern geben.Zugang Sellerie Arbeiter Instanz innerhalb der Aufgabe

Um dies zu erreichen überschreibe ich die Standard Worker Klasse der Sellerie, bei der Initialisierung des Arbeiters Ich beginne den Jupyter-Kernel und mit der Stop-Methode schließe ich den Jupyter-Kernel.

Das aktuelle Problem, mit dem ich konfrontiert bin, ist, wie kann ich auf diese Kernel-Instanz innerhalb der Aufgabe zugreifen, während die Aufgabe ausgeführt wird?

Gibt es eine bessere Möglichkeit, die Worker Klassendefinition für die celery Anwendung als app.Worker = CustomWorker zu überschreiben?

Hier ist die Sellerie-Konfiguration mit dem Custom Worker.

from __future__ import absolute_import, unicode_literals 
from celery import Celery 
from jupyter_client import MultiKernelManager 

app = Celery('proj', 
    broker='redis://', 
    backend='redis://', 
    include=['tasks']) 

app.conf.update(
    result_expires=3600 
) 

class CustomWorker(app.Worker): 
    def __init__(self, *args, **kwargs): 
     self.km = MultiKernelManager() 
     self.kernel_id = self.km.start_kernel() 
     print("Custom initializing") 
     self.kernel_client = km.get_kernel(kernel_id).client() 
     super(CustomWorker, self).__init__(*args, **kwargs) 

    def on_close(self): 
     self.km.shutdown_kernel(self.kernel_id) 
     super(CustomWorker, self).on_close() 

app.Worker = CustomWorker 

if __name__ == '__main__': 
    app.start() 

Hier ist ein Skelett tasks.py

from __future__ import absolute_import, unicode_literals 
from celery import app 

from celery import Task 
from tornado import gen 
from jupyter_client import MultiKernelManager 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
ioloop.install() 

reply_futures = {} 

# This is my celery task where I pass the arbitary python code to execute on 
# some celery worker(actually to the corresponding kernel) 
@app.task 
def pythontask(code): 
    # I don't know how to get the kernel_client for current celery worker !!? 
    kernel_client = self.get_current_worker().kernel_client 
    mid = kernel_client.execute(code) 

    # defining the callback which will be executed when message arrives on 
    # zmq stream 
    def reply_callback(session, stream, msg_list): 
     idents, msg_parts = session.feed_identities(msg_list) 
     reply = session.deserialize(msg_parts) 
     parent_id = reply['parent_header'].get('msg_id') 
     reply_future = reply_futures.get(parent_id) 
     if reply_future: 
      reply_future.set_result(reply) 

    @gen.coroutine 
    def execute(kernel_client, code): 
     msg_id = kernel_client.execute(code) 
     f = reply_futures[msg_id] = Future() 
     yield f 
     raise gen.Return(msg_id) 

    # initializing the zmq streams and attaching the callback to receive message 
    # from the kernel 
    shell_stream = ZMQStream(kernel_client.shell_channel.socket) 
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket) 
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 

    # create a IOLoop 
    loop = ioloop.IOLoop.current() 
    # listen on the streams 
    msg_id = loop.run_sync(lambda: execute(kernel_client,code)) 
    print(reply_msgs[msg_id]) 
    reply_msgs[msg_id] = [] 

    # Disable callback and automatic receiving. 
    shell_stream.on_recv_stream(None) 
    iopub_stream.on_recv_stream(None) 

Antwort

0

dass Arbeiter Instanzinformationen Zusätzlich zu dem Request-Objekt meines Problem gelöst. Um das zu tun, habe ich die _process_task Methode der Arbeiterklasse übersteuert.

def _process_task(self, req): 
    try: 
    req.kwargs['kernel_client'] = self.kernel_client 
    print("printing from _process_task {}".format(req.kwargs)) 
    req.execute_using_pool(self.pool) 
    except TaskRevokedError: 
    try: 
     self._quick_release() # Issue 877 
    except AttributeError: 
     pass 
    except Exception as exc: 
    logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True) 

Hier ist meine Aufgabe, wo ich den Zugriff auf kernel_client

@app.task(bind=True) 
def pythontask(self,code, kernel_client=None): 

    mid = kernel_client.execute(code) 

    print("{}".format(kernel_client)) 
    print("{}".format(mid)) 

Das Ding funktioniert nur, wenn ich Arbeiter im Solo-Modus starten sonst nicht wirft es einige Beizen Fehler. Wie auch immer, ich verwende Solo-Worker, daher ist diese Lösung für mich geeignet.

Verwandte Themen