2010-01-25 9 views
16

Ich habe ein Django-Projekt und versuche, Sellerie zu verwenden, um Aufgaben für die Hintergrundverarbeitung einzureichen (http://ask.github.com/celery/introduction.html). Sellerie lässt sich gut mit Django integrieren und ich konnte meine benutzerdefinierten Aufgaben einreichen und Ergebnisse erhalten.Wie kann ich Sellery einrichten, um vor dem Ausführen meiner Aufgaben eine benutzerdefinierte Initialisierungsfunktion aufzurufen?

Das einzige Problem ist, dass ich keinen vernünftigen Weg der Durchführung einer benutzerdefinierten Initialisierung im Daemon-Prozess finden kann. Ich muss eine teure Funktion aufrufen, die viel Arbeitsspeicher lädt, bevor ich mit der Verarbeitung der Aufgaben anfange, und ich kann es mir nicht leisten, diese Funktion jedes Mal aufzurufen.

Hat jemand dieses Problem schon einmal gehabt? Irgendwelche Ideen, wie man es umgehen kann, ohne den Sellerie-Quellcode zu modifizieren?

Dank

+0

Welche Art von benutzerdefinierten Initialisierung benötigen Sie ausführen? – diegueus9

+0

Ich muss eine ~ 10MB Datenstruktur laden, die für die Verarbeitung jeder Aufgabe benötigt wird (die Struktur ist für alle Aufgaben identisch). – xelk

Antwort

15

Sie können entweder einen benutzerdefinierten Lader, schreiben oder die Signale verwenden.

Lader haben die on_task_init Methode, die aufgerufen wird, wenn eine Aufgabe ausgeführt werden soll, über, und on_worker_init, die durch den Sellerie + celerybeat Hauptprozess bezeichnet wird.

Signale verwenden ist wahrscheinlich die einfachste, die Signale zur Verfügung stehen:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    ausgelöst, wenn eine Aufgabe ist, etwa durch den Arbeiter ausgeführt werden (oder lokal wenn apply verwendet wird/oder wenn CELERY_ALWAYS_EAGER eingestellt wurde).

  • task_postrun(task_id, task, args, kwargs, retval) Wird ausgelöst, nachdem eine Aufgabe unter den gleichen Bedingungen wie oben ausgeführt wurde.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    aufgerufen, wenn eine Aufgabe (gut nicht für lang laufende Operationen) angewendet wird

Zusätzlich Signale in 0.9.x (aktuelle Master-Zweig auf Github):

  • worker_init()

    Wird aufgerufen, wenn selleryd gestartet wurde (bevor die Task initialisiert wird. Wenn also auf einem System, das fork unterstützt, alle Speicheränderungen in die untergeordneten Worker-Prozesse kopiert werden).

  • worker_ready()

    aufgerufen, wenn celeryd in der Lage, Aufgaben zu erhalten.

  • worker_shutdown()

    aufgerufen, wenn celeryd heruntergefahren wird.

Hier ist ein Beispiel Vorberechnung etwas zum ersten Mal eine Aufgabe im Prozess ausgeführt wird:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

Wenn Sie die Funktion wollen für alle Aufgaben ausgeführt werden, lassen Sie die sender Argument.

Verwandte Themen