2016-11-13 2 views
0

ich Kette versuchen folgende Aufgaben mit Sellerie (v4.0),Sellerie generieren Gruppenaufgaben aus Ketten Aufgabe

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s() 
result = task.get() 

Above Teil bis generate_job_requests als Akkord fein arbeiten. Aber Problem beginnt mit execute_job, wo es eine Liste von Jobs von generate_job_requests erhält, für die ich parallele Aufgaben und später aggregierte Ergebnisse aller Jobs erstellen muss.

Ich versuche zu validieren, ob eine solche Art von Taskgraph mit Sellerie möglich ist? Gibt es einen möglichen alternativen Workflow, um das Problem mit dieser Abhängigkeit zu lösen? Alles, was ich in der Dokumentation fehlt.

Antwort

0

I Karte wie Funktionalität mit Zwischen Aufgabe Schöpfer verwendet, die wie Akkord wirkt,

@shared_task(ignore_result=False) 
def dmap(it, callback, end_task): 
    callback = subtask(callback) 
    grp = group(callback.clone([arg, ]) for arg in it) 
    c = (grp | end_task) 
    return c() 

So Taskflow wurde, da dies reduziert,

task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
     execute_job.s(), aggregate_result.s())).apply_async() 

Für die ultimative Ausgabe der Aufgabe bekommen, ich habe einige zwickt,

# As we get dmap task id here 
dmap_task = celery_app.AsyncResult(task.id) 
dmap_result = dmap_task.get() 
# Get actual aggregate_result task id 
aggr_res_task_id = dmap_result[0][0] 
result = celery_app.AsyncResult(aggr_res_task_id) 
# Here we receive actual output of overall task 
result.get() 

I bezeichnet answer

Verwandte Themen