Sie können entweder einen benutzerdefinierten Lader, schreiben oder die Signale verwenden.
Lader haben die on_task_init
Methode, die aufgerufen wird, wenn eine Aufgabe ausgeführt werden soll, über, und on_worker_init
, die durch den Sellerie + celerybeat Hauptprozess bezeichnet wird.
Signale verwenden ist wahrscheinlich die einfachste, die Signale zur Verfügung stehen:
0.8.x:
task_prerun(task_id, task, args, kwargs)
ausgelöst, wenn eine Aufgabe ist, etwa durch den Arbeiter ausgeführt werden (oder lokal wenn apply
verwendet wird/oder wenn CELERY_ALWAYS_EAGER
eingestellt wurde).
task_postrun(task_id, task, args, kwargs, retval)
Wird ausgelöst, nachdem eine Aufgabe unter den gleichen Bedingungen wie oben ausgeführt wurde.
task_sent(task_id, task, args, kwargs, eta, taskset)
aufgerufen, wenn eine Aufgabe (gut nicht für lang laufende Operationen) angewendet wird
Zusätzlich Signale in 0.9.x (aktuelle Master-Zweig auf Github):
worker_init()
Wird aufgerufen, wenn selleryd gestartet wurde (bevor die Task initialisiert wird. Wenn also auf einem System, das fork
unterstützt, alle Speicheränderungen in die untergeordneten Worker-Prozesse kopiert werden).
worker_ready()
aufgerufen, wenn celeryd in der Lage, Aufgaben zu erhalten.
worker_shutdown()
aufgerufen, wenn celeryd heruntergefahren wird.
Hier ist ein Beispiel Vorberechnung etwas zum ersten Mal eine Aufgabe im Prozess ausgeführt wird:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Wenn Sie die Funktion wollen für alle Aufgaben ausgeführt werden, lassen Sie die sender
Argument.
Welche Art von benutzerdefinierten Initialisierung benötigen Sie ausführen? – diegueus9
Ich muss eine ~ 10MB Datenstruktur laden, die für die Verarbeitung jeder Aufgabe benötigt wird (die Struktur ist für alle Aufgaben identisch). – xelk