2017-11-01 3 views
1

Ich kann nicht herausfinden, wie Sellerie Aufgaben auf modulare Weise (d. H. nicht alle Aufgaben in der gleichen Datei) definieren und sie für die asynchrone Verwendung korrekt registrieren. Ich habe alle Optionen ausprobiert Ich kann off denken:Sellerie Task-Registrierung für asynchrone Aufgaben

  • die Dekorateur basierte Aufgaben mit
  • mit Klasse basierte Aufgaben
  • autodiscover_tasks auf Sellerie App Objekt
  • Registrierung Aufgaben manuell, dh app.tasks.register(Task1()) für eine Klasse mit basierend Aufgabe
  • Sellerie Arbeiter in demselben Verzeichnis wie celery_app.py Ausführung
  • Sellerie Arbeiter in dem Verzeichnis, über den eine Ausführung celery_app.py enthaltenden
  • 012.
  • Angabe der App für den celery worker Befehl mit der Option -A.
  • nicht die App für den Sellerie Arbeiter Angabe
  • eine leere „Namen“ Eigenschaft in der Klasse basierte Aufgabe angeben.
  • einen Namen Eigenschaft Angabe mit dem richtigen Modul enthält

Was ich auch tue, habe ich immer mit ‚KeyError‘ von dem Task-Registry geworfen am Ende, aber nur, wenn sie mit apply_async ausgeführt wird. Die synchrone Version funktioniert immer gut.

Wenn jemand mir einen Hinweis geben könnte, was ich tun sollte, um das zu beheben, bitte teilen.

ist hier ein minimales Beispiel:

  • minimal
    • task1
      • __init__.py
      • task.py
    • task2
      • __init__.py
      • task.py
    • __init__.py
    • celery_app.py
    • start.sh
    • test.py

minimal.task1.task

# -*- coding: utf-8 -*- 
from celery import Task 
from minimal2.celery_app import app 
class Task1(Task): 
    name = "" 
    def run(self, number): 
     return number/2.0 

app.tasks.register(Task1()) 

minimal.task2.Aufgabe

# -*- coding: utf-8 -*- 
from celery import Task 
from minimal2.celery_app import app 

class Task2(Task): 
    name = "minimal2.task2.task.Task2" 
    def run(self, number): 
     return number * number 

app.tasks.register(Task2()) 

minimal2.celery_app

# -*- coding: utf-8 -*- 
from celery import Celery 

app = Celery('minimal', backend='amqp', broker='amqp://') 
app.autodiscover_tasks(['task1', 'task2'], 'task') 

minimal2/start.sh

#!/bin/bash 
set -e 

start_celery_service() { 
    name=$1 
    pid_file_path="$(pwd)/${name}.pid" 
    if [ -e "${pid_file_path}" ] ; then 
     kill $(cat ${pid_file_path}) && : 
     sleep 3.0 
     rm -f "${pid_file_path}" # just in case the file was stale 
    fi 
    celery -A minimal2.celery_app.app worker -l DEBUG --pidfile=${pid_file_path} --logfile="$(pwd)/${name}.log" & 
    sleep 3.0 
} 

prev_dir=$(pwd) 
cd "$(dirname "$0")" 
cd ../ 
rabbitmq-server & 
start_celery_service "worker1" 
cd $prev_dir 

Test

from minimal2.task1.task import Task1 
print Task1().apply(args=[], kwargs={'number':2}).get() 
> 1.0 
print Task1().apply_async(args=[], kwargs={'number':2}).get() # (first time: never comes back -> hitting ctrl-c) 
print Task1().apply_async(args=[], kwargs={'number':2}).get() # second time 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 194, in get 
    on_message=on_message, 
    File "/usr/local/lib/python2.7/site-packages/celery/backends/base.py", line 470, in wait_for_pending 
    return result.maybe_throw(propagate=propagate, callback=callback) 
    File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 299, in maybe_throw 
    self.throw(value, self._to_remote_traceback(tb)) 
    File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 292, in throw 
    self.on_ready.throw(*args, **kwargs) 
    File "/usr/local/lib/python2.7/site-packages/vine/promises.py", line 217, in throw 
    reraise(type(exc), exc, tb) 
    File "<string>", line 1, in reraise 
celery.backends.base.NotRegistered: '' 

#.. same spiel with Task2: 
#.. 
> celery.backends.base.NotRegistered: 'minimal2.task2.task.Task2' 

#.. same if I do name = __name__ in Task2: 
#.. 
> celery.backends.base.NotRegistered: 'minimal2.task2.task' 

# autodiscover had no effect 

ich das gleiche Verhalten in Ubuntu in einem Docker Container als macOS auch gehabt haben, sowohl auf der neuesten Version Sellerie auf PyPy:

celery report 

software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:2.7.13 
      billiard:3.5.0.3 py-amqp:2.2.2 
platform -> system:Darwin arch:64bit imp:CPython 
loader -> celery.loaders.default.Loader 
settings -> transport:amqp results:disabled 

Antwort

1

Wenn ich die Frage richtig verstanden, Sie include Argument verwenden können wo du deine Sellerie-App erstellst. Es registriert alle Aufgaben, die in den im Argument include erwähnten Modulen gefunden werden. Zum Beispiel:

celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], 
        CELERY_RESULT_BACKEND=app.config['CELERY_BROKER_URL'], 
        include=['minimal.task1', 'minimal.task2']) 

Bearbeiten von Frage Poster: Zusätzlich, um die korrekte Import Namensgebung, die Task-Klasse name-Eigenschaft zu erhalten muss wie folgt eingestellt werden:

class Task1(Task): 
    name = __name__ 

Wesentliches der Der Wert name zur Zeit der Taskregistrierung muss exakt mit dem Namen übereinstimmen, mit dem die Task auf der Clientseite importiert wird.

+0

Danke. Doing 'app = Sellerie ('minimal', backend = 'amqp', broker = 'amqp: //', include = ['task1/task.py', 'task2/task.py'])' erzeugt 'ImportError: Import nach Dateiname wird nicht unterstützt, wenn der Worker gestartet wird. Verwenden Sie dies in der neuesten Sellerie-Version? –

+0

festhalten. Das bringt mich einen Schritt näher: 'app = Sellerie ('minimal', backend = 'amqp', broker = 'amqp: //', include = ['minimal2.task1.task', 'minimal2.task2.task']) '. Dies löst den Registrierungsfehler, den ich hatte! Die schlechte Nachricht ist ... selbst die geposteten minimalen Aufgaben bleiben einfach im Status PENDING :( –

+0

Importieren Sie die Aufgabenmodule anstelle von Dateien oder sogar jeder Aufgabe explizit. Wie 'minimal.task1'. Dies wird den Importfehler beheben –

Verwandte Themen