2012-05-22 30 views
6

Ich benutze Django und Sellerie und ich versuche, Routing zu mehreren Warteschlangen einzurichten. Wenn ich die Aufgaben routing_key und exchange (entweder im Task Decorator oder unter Verwendung von apply_async()) spezifiziere, wird die Aufgabe dem Broker nicht hinzugefügt (das ist Kombu, das eine Verbindung zu meiner MySQL-Datenbank herstellt).Django & Sellerie - Routing Probleme

Wenn ich den Namen der Warteschlange im Task Decorator angeben (was bedeutet, dass der Routing-Schlüssel ignoriert wird), funktioniert die Aufgabe gut. Es scheint ein Problem mit dem Routing/Exchange-Setup zu sein.

Irgendeine Idee, was das Problem sein könnte?

Hier ist das Setup:

settings.py

INSTALLED_APPS = (
    ... 
    'kombu.transport.django', 
    'djcelery', 
) 
BROKER_BACKEND = 'django' 
CELERY_DEFAULT_QUEUE = 'default' 
CELERY_DEFAULT_EXCHANGE = "tasks" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "task.default" 
CELERY_QUEUES = { 
    'default': { 
     'binding_key':'task.#', 
    }, 
    'i_tasks': { 
     'binding_key':'important_task.#', 
    }, 
} 

tasks.py

from celery.task import task 

@task(routing_key='important_task.update') 
def my_important_task(): 
    try: 
     ... 
    except Exception as exc: 
     my_important_task.retry(exc=exc) 

Aufgabe einleiten:

from tasks import my_important_task 
my_important_task.delay() 
+0

Wie übergeben Sie routing_key ? Mit async_apply? – mher

+0

Ich verwende die 'delay()' Methode, die nur eine Verknüpfung für 'apply_async()' ist. Ich versuche, die "routing_key" -Spezifikation mit der Task-Methode (über den Dekorator) zu behalten, anstatt wenn sie aufgerufen wird. Ich habe versucht, den Schlüssel mit 'apply_async()' stattdessen zu übergeben, aber ich bekomme das gleiche Problem. –

+0

Verzögerung akzeptiert routing_key-Schlüsselwort nicht. Es ist eine vereinfachte Version von apply_async, aber sie sind nicht identisch. – mher

Antwort

43

Sie sind die Django ORM als Broker, dh Erklärungen nur im Speicher abgelegt werden (siehe, inarguably hart, Transportvergleichstabelle bei http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison zu finden)

Also, wenn Sie diese Aufgabe mit routing_key anwenden wird es nicht möglich sein, es zu routen, weil es die Warteschlange noch nicht deklariert hat.

Es wird funktionieren, wenn Sie dies tun:

@task(queue="i_tasks", routing_key="important_tasks.update") 
def important_task(): 
    print("IMPORTANT") 

Aber es wäre viel einfacher für Sie die automatische Routing-Funktion nutzen zu können, da gibt es hier nichts, was Sie brauchen verwenden zeigt ' Thema‘Austausch, auf automatisches Routing verwenden Sie einfach entfernt die Einstellungen:

  • CELERY_DEFAULT_QUEUE
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

Und erklären Sie Ihre Aufgabe wie folgt aus:

@task(queue="important") 
def important_task(): 
    return "IMPORTANT" 

und dann einen Arbeiter zu starten aus dieser Warteschlange raubend:

$ python manage.py celeryd -l info -Q important 

oder sowohl von der Standard (celery) Warteschlange und der important Warteschlange zu konsumieren:

$ python manage.py celeryd -l info -Q celery,important 

Eine weitere gute Praxis ist nicht die Queue-Namen in die Aufgabe zu codieren und CELERY_ROUTES stattdessen verwenden :

@task 
def important_task(): 
    return "DEFAULT" 

dann in Ihren Einstellungen:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

Wenn Sie immer noch darauf bestehen Thema Börsen verwenden, dann könnten Sie diesen Router hinzufügen, um automatisch alle Warteschlangen zum ersten Mal eine Aufgabe gesendet wird zu erklären:

class PredeclareRouter(object): 
    setup = False 

    def route_for_task(self, *args, **kwargs): 
     if self.setup: 
      return 
     self.setup = True 
     from celery import current_app, VERSION as celery_version 
     # will not connect anywhere when using the Django transport 
     # because declarations happen in memory. 
     with current_app.broker_connection() as conn: 
      queues = current_app.amqp.queues 
      channel = conn.default_channel 
      if celery_version >= (2, 6): 
       for queue in queues.itervalues(): 
        queue(channel).declare() 
      else: 
       from kombu.common import entry_to_queue 
       for name, opts in queues.iteritems(): 
        entry_to_queue(name, **opts)(channel).declare() 
CELERY_ROUTES = (PredeclareRouter(),) 
+0

Danke für die Erklärung! –

+2

Wird dieses Problem bei Warteschlangendeklarationen und Austauschen in Sellery 3 behoben? Ich benutze die neue 'CELERY_QUEUES = (Queue (...), ...)' in den Einstellungen, heißt das, dass die Queues korrekt deklariert werden? –

+0

Hinweis: Seit Cellery 4.0 wurde CELERY_ROUTES durch CELERY_TASK_ROUTES ersetzt. Könnte jemand Zeit sparen. –