2017-01-18 2 views
2

ich so etwas wie dies tun möchten:Schedule Sellerie Aufgabe nach anderen Aufgabe ausgeführt werden soll (en) komplett

results = [] 
for i in range(N): 
    data = generate_data_slowly() 
    res = tasks.process_data.apply_async(data) 
    results.append(res) 
celery.collect(results).then(tasks.combine_processed_data()) 

dh starten asynchrone Aufgaben über einen langen Zeitraum, planen Sie dann eine abhängige Aufgabe, die nur sein ausgeführt, sobald alle früheren Aufgaben abgeschlossen sind.

Ich habe Dinge wie chain und chord angeschaut, aber es scheint, als ob sie nur funktionieren, wenn Sie Ihr Task-Diagramm vollständig im Voraus erstellen können.

Antwort

0

Auf diese Weise würden Sie Ihre Aufgaben synchron ausführen.

Die Lösung hängt vollständig davon ab, wie und wo data gesammelt werden. Grob vorausgesetzt, dass generate_data_slowly und tasks.process_data synchronisiert sind, wäre ein besserer Ansatz, beide in einem task (oder einem chain) und group sie zu verbinden.

chord können Sie einen Rückruf zu diesem group hinzufügen.

wäre Das einfachste Beispiel sein:

from celery import chord 

@app.task 
def getnprocess_data(): 
    data = generate_data_slowly() 
    return whatever_process_data_does(data) 

header = [getnprocess_data.s() for i in range(N)] 
callback = combine_processed_data.s() 

chord(header)(callback).get() 
+0

nein Ich möchte nicht, dass 'generate_data_slowly' und' process_data' synchronisiert werden. 'process_data' sollte von einem Worker ausgeführt werden, während ein anderer Datensatz generiert wird. 'generate_data_slowly' beinhaltet Hardware und muss in einem Master-Prozess laufen, nicht zwischen Arbeitern. – so12311

1

Für alle Interessierten, endete ich diese Schnipsel mit bis:

@app.task(bind=True, max_retries=None) 
def wait_for(self, task_id_or_ids): 
    try: 
     ready = app.AsyncResult(task_id_or_ids).ready() 
    except TypeError: 
     ready = all(app.AsyncResult(task_id).ready() 
        for task_id in task_id_or_ids) 

    if not ready: 
     self.retry(countdown=2**self.request.retries) 

Und den Workflow so etwas wie dieses schreiben:

task_ids = [] 
for i in range(N): 
    task = (generate_data_slowly.si(i) | 
      process_data.si(i) 
      ) 
    task_id = task.delay().task_id 
    task_ids.append(task_id) 

final_task = (wait_for(task_ids) | 
     combine_processed_data.si() 
     ) 

final_task.delay() 
+0

Ehy großartige Lösung .. aber ich mag nicht die 2 ** self.request.retries, da es buchstäblich leicht explodiert: das ist besser, einen statischen Wert zu verwenden. Außerdem gibt es einen Tippfehler im Code: 'final_task = (wait_for (task_ids) | combine_processed_data.si())' sollte 'final_task = (wait_for (task_ids) .si() | combine_processed_data.si())' sein –

Verwandte Themen