2017-07-26 3 views
0

Ich versuche, bestehenden Sellerie-Gruppenruf in einen Akkord zu konvertieren, um Deadlocks zu verhindern. Der vorherige Code hatte Wiederholungen und eine Ablaufzeit. Ich habe es geschafft, dass der Akkord ohne diese Einstellungen funktioniert, aber wenn ich versuche, die Einstellung zu übernehmen, sehe ich nicht, dass die Aufgaben ausgeführt werden. Ich habe in der Dokumentation nichts darüber gesehen, wie man die gleichen Einstellungen auf den Akkord als Ganzes anwendet. Ich betreibe Sellerie Version 3.1.6.Aufgabenoptionen, die Aufgaben davon abhalten, im Sellerie-Akkord zu laufen

zurück Code:

jobs = group([reset_device.s(topoid, dev_list[i], 
       waittime_list[i], skipflag) for i in range(len(dev_list))] 
      ).apply_async(expires=waittime, retry=True, retry_policy={ 
                'max_retries': 3, 
                'interval_start': 0.5, 
                'interval_step': 0.2, 
                'interval_max': 0.2}) 
results = jobs.join_native(timeout=waittime + 600, propagate=True) 

Arbeits Akkord (ohne Einstellungen):

jobs = chord([reset_device.s(topoid, dev_list[i], 
       waittime_list[i], skipflag) for i in range(len(dev_list))])(callback) 

Nichtarbeits Akkord # 1:

jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i], 
      skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy) 
       for i in range(len(dev_list))])(callback) 

Nichtarbeits Akkord # 2

jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag), 
       expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy) 
      for i in range(len(dev_list))])(callback) 

In den beiden Fällen # 1 und # 2 scheinen die Aufgaben im Akkord nicht ausgeführt zu werden. Wie kann ich die Ablaufzeit anwenden und für jede der im Akkord genannten Aufgaben wiederholen?

+0

Können Sie Blume einrichten, um sicherzustellen, dass diese Aufgaben nicht ausgeführt wurden? – jheld

+0

Was ist mit dem 'retry' innerhalb der' reset_device' Task? und definiere den Task Decorator als '@ app.task (max_retries = 3)' und was immer du brauchst .. – ItayB

Antwort

1

Ich fand es heraus und es war eine Mischung aus Problemen.

Das erste Problem war, dass das expires-Feld keine Ganzzahl, nur datetime-Objekte innerhalb eines Akkords (möglicherweise auch Gruppen und Ketten) akzeptierte, obwohl die Dokumentation keinen Unterschied machte. Dies wurde in einer späteren Version behoben, ich habe mit 3.1.25 getestet und konnte das Update überprüfen.

Das zweite Problem ist, dass Sellerie 3.1.6 keine Fehler in Akkorden protokolliert (ich denke auch an Gruppen und Ketten). Dies wurde ebenfalls behoben, ich habe in 3.1.25 getestet und konnte den Fehler sehen.

Das dritte Problem wurde auf die Fehlermeldung im Zusammenhang:

[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord 
ret = j(timeout=3.0, propagate=propagate) 
    File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native 
raise value 
TaskRevokedError: expired 

Das war, weil die Zeitzone nicht richtig war. Ich hatte datetime.now() anstelle von datetime.utcnow() verwendet, was das Problem behebt und in 3.1.6 funktioniert.

Alternativ könnte ich die Sellerie-Konfiguration CELERY_ENABLE_UTC = False eingestellt haben, die standardmäßig auf True gesetzt ist. Das hat mich verwirrt, weil wir die Konfig CELERY_TIMEZONE auf die lokale Zeit eingestellt hatten. Das expires-Feld verwendet bei Verwendung mit einem datetime-Objekt abhängig vom Wert der CELERY_ENABLE_UTC-Einstellung entweder die lokale Zeit oder UTC. Ich empfehle, die beiden Konfigurationseinstellungen gleich zu halten.

Interessanterweise wird die Callback-Funktion erstellt und fragt ab, ob der Akkord trotz der nie ausgeführten Akkord-Tasks ausgeführt wird und für immer dort bleibt. Ich glaube das may have been fixed in celery 4.1.

Verwandte Themen