2013-04-30 8 views
8

ich eine Anwendung Ich schreibe die eine Reihe von Aufgaben parallel ausgeführt werden muss und dann eine einzelne Aufgabe mit den Ergebnissen aller Aufgaben ausführen:eine Aufgabe Laufen, nachdem alle Aufgaben abgeschlossen wurden

@celery.task 
def power(value, expo): 
    return value ** expo 

@celery.task 
def amass(values): 
    print str(values) 

Es ist ein sehr konstruiertes und vereinfachtes Beispiel, aber hoffentlich kommt der Punkt gut rüber. Grundsätzlich habe ich viele Elemente, die durch power laufen müssen, aber ich möchte nur amass auf die Ergebnisse von allen Aufgaben ausführen. All dies sollte asynchron passieren, und ich brauche nichts zurück von der amass Methode.

Kann jemand in Sellerie einrichten, so dass alles asynchron ausgeführt wird und ein einziger Rückruf mit einer Liste der Ergebnisse aufgerufen wird, nachdem alles gesagt und getan ist?

ich Setup habe dieses Beispiel mit einem chord als Alexander Afanasiev laufen empfohlen:

from time import sleep 

import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

Leider ist in dem obigen Beispiel alle Aufgaben in tasks gestartet werden, nur dann, wenn die chord Methode aufgerufen wird. Gibt es eine Möglichkeit, dass jede Aufgabe einzeln gestartet werden kann und ich dann einen Rückruf zu der Gruppe hinzufügen kann, die ausgeführt wird, wenn alles fertig ist?

Antwort

3

Hier ist eine Lösung, die für meine Zwecke gearbeitet:

tasks.py:

from time import sleep 

import random 

@celery.task 
def power(value, expo): 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 
    return value ** expo 

@celery.task 
def amass(results, tasks): 
    completed_tasks = [] 
    for task in tasks: 
     if task.ready(): 
      completed_tasks.append(task) 
      results.append(task.get()) 

    # remove completed tasks 
    tasks = list(set(tasks) - set(completed_tasks)) 

    if len(tasks) > 0: 
     # resend the task to execute at least 1 second from now 
     amass.delay(results, tasks, countdown=1) 
    else: 
     # we done 
     print results 

Use Case:

tasks = [] 

for i in xrange(10): 
    tasks.append(power.delay(i, 2)) 

amass.delay([], tasks) 

Was sollte diese tun ist Start alle Aufgaben so schnell wie möglich asynchron. Sobald sie alle in die Warteschlange eingereiht wurden, wird die Aufgabe amass ebenfalls in die Warteschlange gestellt. Die Sammelaufgabe wird sich so lange selbst aktualisieren, bis alle anderen Aufgaben abgeschlossen sind.

+0

Hallo, sieht so aus, als ob das Konzept zumindest konzeptionell gut ist. Jedoch, wenn ich es ausprobierte, der genau gleiche Code wie oben, wirft es den folgenden Fehler: 'EncodeError: ist nicht JSON serializable' Würde wirklich etwas Hilfe hier schätzen . – qre0ct

+0

Ok, ich löste den obigen Fehler, indem ich die final_task() eine Liste von taskIds selbst übergebe, anstatt sie an die Liste der Aufgabenobjekte zu übergeben, wie im obigen Codebeispiel. Danke trotzdem für die Antwort. Es hat sehr geholfen. – qre0ct

3

Sellerie hat plenty of tools für die meisten Workflows, die Sie sich vorstellen können.

Es scheint, dass Sie chord verwenden müssen. Hier ist ein Zitat von docs:

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

+0

Das ist definitiv richtig, aber es gibt ein Problem damit. Ich habe meine Antwort mit den Details aktualisiert. –

0

Die Antwort, dass @ alexander-Afanasiev haben Sie im Wesentlichen richtig sind: einen Akkord verwenden.

Ihr Code ist OK, aber tasks.append(power.s((i, 2))) führt die Teilaufgabe nicht aus, sondern fügt nur subtasks zu einer Liste hinzu. Es ist chord(...)(...) die eine, die so viele Nachrichten an den Broker senden wie Teilaufgaben, die Sie in tasks Liste definiert haben, plus eine weitere Nachricht für die Rückruf-Teilaufgabe. Wenn Sie chord aufrufen, wird es zurückgegeben, sobald es möglich ist.

Wenn Sie wissen möchten, wann der Akkord fertig ist, können Sie wie bei einer einzelnen Aufgabe unter Verwendung von r.ready() in Ihrem Sample nach Abschluss suchen.

+0

Ich möchte, dass jede Teilaufgabe ausgeführt wird, sobald sie gepostet wird, nicht wenn der Akkord gepostet wird. Ist das möglich? –

+0

Nun, machen Sie einfach eine 'power.delay (i, 2)' in der Schleife und rufen Sie alle Zwischenergebnisse ab, bevor Sie 'amass (results)' aufrufen. Aber ich verstehe den Punkt nicht wirklich. Mit dem Akkord werden die Teilaufgaben 'power.s' ausgeführt, sobald sie als Nachrichten im Broker und' amass' nach deren Beendigung verfügbar sind. Ich denke, Sie sollten klarstellen, was Sie erreichen möchten, weil Ihr Wunsch, die Aufgaben asynchron auszuführen, der von Ihnen vorgeschlagenen Verwendung widerspricht. – enlavin

+0

Ich habe eine Lösung gefunden, die zeigt, was ich machen wollte. –

0

einen Blick auf diese Schnipsel aus Ihrer Frage machen, sieht es aus wie Sie einen list als Akkord-Header sind vorbei, anstatt ein group:

from time import sleep 
import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

die list zu einem group Konvertieren in das Verhalten führen sollte Sie erwarten:

... 

callback = amass.s() 

tasks = group(tasks) 

r = chord(tasks)(callback) 
Verwandte Themen