2016-10-02 7 views
1

Ich versuche einen Arbeitsablauf zu entwickeln, der auf Sellerie basiert. Ich benutze Gruppen und Akkorde.Komplexer Arbeitsablauf auf Sellerie

Im Beispiel unten gibt es unabhängige Gruppen ([mytask1, mytask1, mytask1, ..] -> myfinaltask1), wo mytask1 könnten parallel ausgeführt werden, aber myfinaltask1 sollte nach jeder Gruppe aufgerufen werden.

Code:

def func1(date): 
    subtasks = [] 
    for filepath in all_files: 
     kwargs = {'date': date, 'hfile': filepath} 
     subtask = mytask1.subtask(kwargs=kwargs) 
     subtasks.append(subtask) 

    chrd = chord(subtasks) 
    chrdr = chrd(myfinaltask1.s(kwargs={'date': date})) 
    return chrdr 


def main(all_dates): 
    subtasks = [] 
    for ad in all_dates: 
     subtasks.append(func1(ad)) 

    g = group(subtasks) 
    gr = g.apply_async() 
    results = gr.get(propagate=False) # sync wait! 


main([2014, 2015, 2016]) 

Exception geworfen:

File "/mypath/get_evi.py", line 265, in get_evi_year 
    gr = g.apply_async() 
File "/opt/venv/lib/python3.5/site-packages/celery/canvas.py", line 502, in apply_async 
    type = self.type 
File "/opt/venv/lib/python3.5/site-packages/celery/canvas.py", line 569, in type 
    return self.app.tasks[self['task']] 
File "/opt/venv/lib/python3.5/site-packages/celery/canvas.py", line 560, in app 
    return self._app or (self.tasks[0].app if self.tasks else current_app) 
AttributeError: 'bool' object has no attribute 'app' 

Was mache ich falsch?

Antwort

0

Es scheint, dass Sie vergessen haben, subtasks zu group zu wickeln.

def func1(date): 
    subtasks = [] 
    for filepath in all_files: 
     kwargs = {'date': date, 'hfile': filepath} 
     subtask = mytask1.subtask(kwargs=kwargs) 
     subtasks.append(subtask) 

    chrd = chord(header=group(subtasks), body=myfinaltask1.subtask(kwargs={'date': date})) 
    return chrdr 
+4

Danke, ich habe es geschafft, es mit Ihrer Hilfe arbeiten zu lassen! –