Ich kann nicht herausfinden, wie Sellerie Aufgaben auf modulare Weise (d. H. nicht alle Aufgaben in der gleichen Datei) definieren und sie für die asynchrone Verwendung korrekt registrieren. Ich habe alle Optionen ausprobiert Ich kann off denken:Sellerie Task-Registrierung für asynchrone Aufgaben
- die Dekorateur basierte Aufgaben mit
- mit Klasse basierte Aufgaben
- autodiscover_tasks auf Sellerie App Objekt
- Registrierung Aufgaben manuell, dh
app.tasks.register(Task1())
für eine Klasse mit basierend Aufgabe - Sellerie Arbeiter in demselben Verzeichnis wie celery_app.py Ausführung
- Sellerie Arbeiter in dem Verzeichnis, über den eine Ausführung celery_app.py enthaltenden 012.
- Angabe der App für den
celery worker
Befehl mit der Option -A. - nicht die App für den Sellerie Arbeiter Angabe
- eine leere „Namen“ Eigenschaft in der Klasse basierte Aufgabe angeben.
- einen Namen Eigenschaft Angabe mit dem richtigen Modul enthält
Was ich auch tue, habe ich immer mit ‚KeyError‘ von dem Task-Registry geworfen am Ende, aber nur, wenn sie mit apply_async
ausgeführt wird. Die synchrone Version funktioniert immer gut.
Wenn jemand mir einen Hinweis geben könnte, was ich tun sollte, um das zu beheben, bitte teilen.
ist hier ein minimales Beispiel:
- minimal
- task1
__init__.py
task.py
- task2
__init__.py
- task.py
__init__.py
celery_app.py
start.sh
test.py
- task1
minimal.task1.task
# -*- coding: utf-8 -*-
from celery import Task
from minimal2.celery_app import app
class Task1(Task):
name = ""
def run(self, number):
return number/2.0
app.tasks.register(Task1())
minimal.task2.Aufgabe
# -*- coding: utf-8 -*-
from celery import Task
from minimal2.celery_app import app
class Task2(Task):
name = "minimal2.task2.task.Task2"
def run(self, number):
return number * number
app.tasks.register(Task2())
minimal2.celery_app
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery('minimal', backend='amqp', broker='amqp://')
app.autodiscover_tasks(['task1', 'task2'], 'task')
minimal2/start.sh
#!/bin/bash
set -e
start_celery_service() {
name=$1
pid_file_path="$(pwd)/${name}.pid"
if [ -e "${pid_file_path}" ] ; then
kill $(cat ${pid_file_path}) && :
sleep 3.0
rm -f "${pid_file_path}" # just in case the file was stale
fi
celery -A minimal2.celery_app.app worker -l DEBUG --pidfile=${pid_file_path} --logfile="$(pwd)/${name}.log" &
sleep 3.0
}
prev_dir=$(pwd)
cd "$(dirname "$0")"
cd ../
rabbitmq-server &
start_celery_service "worker1"
cd $prev_dir
Test
from minimal2.task1.task import Task1
print Task1().apply(args=[], kwargs={'number':2}).get()
> 1.0
print Task1().apply_async(args=[], kwargs={'number':2}).get() # (first time: never comes back -> hitting ctrl-c)
print Task1().apply_async(args=[], kwargs={'number':2}).get() # second time
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 194, in get
on_message=on_message,
File "/usr/local/lib/python2.7/site-packages/celery/backends/base.py", line 470, in wait_for_pending
return result.maybe_throw(propagate=propagate, callback=callback)
File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 299, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 292, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "<string>", line 1, in reraise
celery.backends.base.NotRegistered: ''
#.. same spiel with Task2:
#..
> celery.backends.base.NotRegistered: 'minimal2.task2.task.Task2'
#.. same if I do name = __name__ in Task2:
#..
> celery.backends.base.NotRegistered: 'minimal2.task2.task'
# autodiscover had no effect
ich das gleiche Verhalten in Ubuntu in einem Docker Container als macOS auch gehabt haben, sowohl auf der neuesten Version Sellerie auf PyPy:
celery report
software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:2.7.13
billiard:3.5.0.3 py-amqp:2.2.2
platform -> system:Darwin arch:64bit imp:CPython
loader -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled
Danke. Doing 'app = Sellerie ('minimal', backend = 'amqp', broker = 'amqp: //', include = ['task1/task.py', 'task2/task.py'])' erzeugt 'ImportError: Import nach Dateiname wird nicht unterstützt, wenn der Worker gestartet wird. Verwenden Sie dies in der neuesten Sellerie-Version? –
festhalten. Das bringt mich einen Schritt näher: 'app = Sellerie ('minimal', backend = 'amqp', broker = 'amqp: //', include = ['minimal2.task1.task', 'minimal2.task2.task']) '. Dies löst den Registrierungsfehler, den ich hatte! Die schlechte Nachricht ist ... selbst die geposteten minimalen Aufgaben bleiben einfach im Status PENDING :( –
Importieren Sie die Aufgabenmodule anstelle von Dateien oder sogar jeder Aufgabe explizit. Wie 'minimal.task1'. Dies wird den Importfehler beheben –