2013-02-19 9 views
9

Wenn ich Route eine Aufgabe zu einer bestimmten Warteschlange funktioniert es:Wie lenke ich eine Kette von Aufgaben an eine bestimmte Schlange in Sellerie?

task.apply_async(queue='beetroot') 

Aber wenn ich eine Kette erstellen:

chain = task | task 

Und dann schreibe ich

chain.apply_async(queue='beetroot') 

Es zu ignorieren scheint das Schlüsselwort queue und ordnet es der Standardwarteschlange 'sellery' zu.

Es wäre schön, wenn Sellerie das Routing in Ketten unterstützt - alle Tasks werden sequenziell in derselben Warteschlange ausgeführt.

Antwort

10

Ok, ich habe diese herausgefunden.

Sie haben die erforderlichen Ausführungsoptionen wie queue = oder Countdown hinzufügen = zur Unteraufgabe Definition oder durch einen Teil:

Unteraufgabe Definition:

from celery import subtask 

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot') 

Teil:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot') 

Dann führen Sie die Kette durch:

chain.apply_async() 

oder

chain.delay() 

Und die Aufgaben werden auf die ‚Rote Bete‘ Warteschlange gesendet werden. Zusätzliche Ausführungsargumente in diesem letzten Befehl werden nichts bewirken. Es wäre nett gewesen, alle diese Ausführungsargumente auf der Chain- (oder Group- oder irgendeiner anderen Canvas-Primitiven) Ebene anzuwenden.

+2

Hmmm, das Teil Beispiel nicht für mich arbeiten, bekam ich folgende Fehlermeldung zurück: Typeerror: nicht unterstützte Operandtyp (e) für |: ‚AsyncResult‘ und 'AsyncResult' (unter Verwendung von 3.0.23) – Clara

+0

Ich hatte eigene Probleme, als ich versuchte, die "Kette" zur Ausführung der zweiten Aufgabe zu bekommen. Frage: Wenn Sie bei beiden Aufgaben 'apply_async' aufrufen, ist das wirklich eine Kette? Werden nicht beide Aufgaben von alleine ausgeführt? Ich habe Ihre Syntax ausprobiert und es ist fehlgeschlagen, weil in meinem Fall die erste Teilaufgabe einen Wert zurückgibt, der von der Sekunde verwendet wird. – PritishC

12

Ich mache es wie folgt aus:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue) 
mychain = celery.chain(subtask, subtask2, ...) 
mychain.apply_async() 
+0

So funktioniert es, wenn 'queue' auf Signatur angegeben ist, aber nicht, wenn es an 'apply_async' übergeben wird? Weißt du, ob es eine gute Dokumentation für dieses Feature gibt? – dashesy

+0

Können verschiedenen Unteraufgaben in derselben Kette verschiedene Warteschlangen zugewiesen werden? – ForeverWintr

3

Das ist ziemlich spät, aber ich glaube nicht, das von @mpaf bereitgestellten Code ganz korrekt ist.

Kontext: In meinem Fall habe ich zwei Teilaufgaben, von denen die erste einen Rückgabewert liefert, der an die Sekunde als Eingabeargument weitergegeben wird. Ich hatte Schwierigkeiten, die zweite Aufgabe auszuführen - ich sah in den Protokollen, dass Sellery die zweite Aufgabe als Rückruf der ersten anerkennen würde, aber sie würde die zweite nie ausführen.

Das ist mein Nicht-Arbeits Chain Code war -:

from celery import chain 

chain(
    module.task1.s(arg), 
    module.task2.s() 
).apply_async(countdown=0.1, queue='queuename') 

die in Antwort des @ MPAF bereitgestellt Syntax verwenden, habe ich beiden Aufgaben auszuführen, aber die Ausführungsreihenfolge war planlos und die zweite Teilaufgabe nicht bestätigt wurde als Rückruf des ersten. Ich hatte die Idee, die Dokumente zu durchsuchen, um eine Warteschlange für eine Teilaufgabe explizit festzulegen.

Dies ist der Arbeitskreis -:

chain(
    module.task1.s(arg).set(queue='queuename'), 
    module.task2.s().set(queue='queuename') 
).apply_async(countdown=0.1) 
+0

arbeitete für mich, bro – phacic

Verwandte Themen