2016-11-26 4 views
0

Ich habe einen laufenden Sellerie-Server mit Redis als der Borker und Ergebnisspeicher (Python3). Ich möchte eine beliebige Funktion, die nicht auf dem Server registriert wurde, von einem Selleriearbeiter ausführen lassen. Ich habe versucht, diese Funktion mit dem Paket zur Serialisierung marshal (nach Is there an easy way to pickle a python function (or otherwise serialize its code)?) und übertrug den Bytecode zu einem Arbeiter:ausführen beliebige Funktion von Sellerie Arbeiter

celery_server.py:

from celery import Celery 
import types 
import marshal 

app = Celery('tasks', broker='redis://[email protected]//', backend='redis://localhost') 

@app.task 
def run_fct(fct_code, args, kwargs): 
    code = marshal.loads(fct.__code__) 
    func = types.FunctionType(code, globals(), "some_func_name") 

    return fct(*args, **kwargs) 

client.py

from celery_server import run_fct 
import marshal 

def calc(x, y): 
    return x*y 

fct_code = marshal.dumps(calc.func_code) 
run_fct.apply_async((fct_code, 10, 2)) 

ich die Folgefehler auf der Clientseite:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte 

in der Funktion bytes_to_str in kombu.utils.encoding.py.

Gibt es einen anderen oder besseren Weg, um meine Funktion ausführen zu lassen?

Vielen Dank für jede Hilfe.

Antwort

0

fand ich eine Lösung:

celery_server.py: meine Funktion und gibt die verpackte Funktion die Sellerie Aufgabe als Argument durch Umwickeln der UnicodeDecodeError löse

class MyFunction(object): 
    def __init__(self, fct): 
     self.fct_code = marshal.dumps(fct.__code__) 

    def run(self, *args, **kwargs): 
     run_fct.apply_async(args=(self,)+args, kwargs=kwargs, serializer='pickle') 

und in client.py:

from celery_server import MyFunction 

myFct = MyFunction(calc) 
myFct.run(x=10, y=2) 

dann funktioniert es wie ein Charme.

jedoch mit dem Paket cloud löst auch Probleme meiner Funktion der betreffenden Arbeitnehmer Seite Abhängigkeit, zum Beispiel, wenn ich in meiner Funktion von zusätzlichen Funktionen oder Pakete verwendet:

from celery import Celery 
import cloud 
import pickle 

app = Celery('tasks', broker='redis://[email protected]//', backend='redis://localhost') 

class MyFunction(object): 
    def __init__(self, fct): 
     self.serialized_code = cloud.serialization.cloudpickle.dumps(fct) 

    def run(self, *args, **kwargs): 
     run_fct.apply_async(args=(self,)+args, kwargs=kwargs, serializer='pickle') 

@app.task 
def run_fct(myFct, *args, **kwargs): 
    fct = pickle.loads(myFct.serialized_code) 

    return fct(*args, **kwargs) 

client.py:

from tasks import MyFunction 
import time 

def calc(x, y): 
    time.sleep(5) 
    return x*y 

myFct = MyFunction(calc) 
myFct.run(x=10, y=2) 

ABER: der aktuelle cloud unterstützt python3 nicht.

Verwandte Themen