2017-07-04 2 views
1

Ich benutze Sellerie Version 4.0.2.Registrieren klassenbasierte Aufgabe in

Im Vergleich zu früheren Versionen von Sellery scheint es, dass klassenbasierte Aufgaben nicht automatisch registriert werden (d. H. Wenn Sie die automatische Erkennung konfiguriert haben).

Ich erreiche jedoch nicht einmal manuell eine klassenbasierte Aufgabe zu registrieren.

Nach dem Sellerie Änderungsprotokoll:

http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1

seit Version manuell 4.0.1 sollte es möglich sein, die Aufgabe zu registrieren:

from celery import Celery, Task 
app = Celery() 

class CustomTask(Task): 

    def run(self): 
     return 'hello' 

app.register_task(CustomTask()) 

Aber das scheint nicht zu funktionieren. Weiß jemand, wie man das erreicht?

habe ich versucht, ein paar Vorschläge, die (abgesehen von Integration eines benutzerdefinierten Aufgaben loader in https://github.com/celery/celery/issues/3744 erwähnt) diskutiert werden:

Register Celery Class-based Task

https://github.com/celery/celery/issues/3615

https://github.com/celery/celery/issues/3744

Antwort

1

Fast da! Sie müssen delay() bei der Aufgabe, die Sie registriert haben, anrufen.

Dies funktionieren würde:

from celery import Celery, Task 

app = Celery() 


class CustomTask(Task): 
    def run(self): 
     return 'hello' 


task = CustomTask() 
app.register_task(task) 

task.delay() 
+0

Aber es funktioniert nicht beim Importieren von 'Task' auf andere Dateien. – harukaeru

0

Wenn Sie shared_task Dekorateur benötigen:

from celery import Task, shared_task 

class CustomTask(Task): 
    def process(self): 
     return 'hello' 

@shared_task(bind=True, base=CustomTask) 
def custom(self): 
    self.process() 

process ist ein benutzerdefinierter Name, die Task gestartet wird (Dekorateur überschreibt run Methode)

bind=True bindet Funktion eine Klasseninstanz

base=CustomTask legt eine Basisklasse für eine Aufgabe

Verwandte Themen