2013-07-04 14 views
18

Ich habe eine check_orders Task, die in regelmäßigen Abständen ausgeführt wird. Es macht eine Gruppe von Aufgaben, so dass ich die Ausführungszeit der Aufgaben bestimmen kann, und wenn alle erledigt sind (dies ist der Zweck von res.join [1] und grouped_subs). Die Aufgaben, die gruppiert sind, sind Paare von verkettete Aufgaben.Sellerie Stop Ausführung einer Kette

Was ich will ist, wenn die erste Aufgabe eine Bedingung nicht erfüllt (fehlschlägt) führen Sie nicht die zweite Aufgabe in der Kette. Ich kann das nicht für das Leben von mir herausfinden und ich denke, das ist ziemlich grundlegende Funktionalität für einen Jobwarteschlangenmanager. Wenn ich die Dinge ausprobiere, die ich nach [2] auskommentiert habe (Ausnahmen auslösen, Callbacks entfernen) ... bleiben wir aus irgendeinem Grund in der join() in check_orders stecken (es bricht die Gruppe). Ich habe versucht, ignore_result für alle diese Aufgaben auf False zu setzen, aber es funktioniert immer noch nicht.

@task(ignore_result=True) 
def check_orders(): 
    # check all the orders and send out appropriate notifications 
    grouped_subs = [] 

    for thingy in things: 
     ... 

     grouped_subs.append(chain(is_room_open.subtask((args_sub_1,)), 
         notify.subtask((args_sub_2,), immutable=True))) 

    res = group(grouped_subs).apply_async() 

    res.join()   #[1] 
    logger.info('Done checking orders at %s' % current_task.request.id)) 

@task(ignore_result=True) 
def is_room_open(args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     # [2] 
     # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? 
     # None of the following things work: 
     # is_room_open.update_state(state='FAILURE') 
     # raise celery.exceptions.Ignore() 
     # raise Exception('spam', 'eggs') 
     # current_task.request.callbacks[:] = [] 

@task(ignore_result=True) 
def notify(args_sub_2): 
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful' 
    notify_user(args_sub_2) 

Antwort

1

Erstens scheint es, wenn in der Funktion Ausnahme besteht ignore_result nicht Ihnen helfen.

Zweitens verwenden Sie unveränderlich = True Es bedeutet, dass nächste Funktion (in unserem Fall ist benachrichtigen) übernimmt keine zusätzliche Argumente. Sie sollten natürlich notify.subtask((args_sub_2,), immutable=False) verwenden, wenn es für Ihre Entscheidung geeignet ist.

Drittens können Sie verwenden Abkürzungen:

notify.si(args_sub_2) statt notify.subtask((args_sub_2,), immutable=True)

und

is_room_open.s(args_sub_1) statt is_room_open.subtask((args_sub_1,))

Ausprobieren Verwenden Sie den Code ein:

@task 
def check_orders(): 
    # check all the orders and send out appropriate notifications 
    grouped_subs = [] 

    for thingy in things: 
     ... 

     grouped_subs.append(chain(is_room_open.s(args_sub_1), 
            notify.s(args_sub_2))) 

    res = group(grouped_subs).apply_async() 

    res.join()   #[1] 
    logger.info('Done checking orders at %s' % current_task.request.id)) 

@task 
def is_room_open(args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     # [2] 
     # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? 
     # None of the following things work: 
     # is_room_open.update_state(state='FAILURE') 
     # raise celery.exceptions.Ignore() 
     # raise Exception('spam', 'eggs') 
     # current_task.request.callbacks[:] = [] 
     return False 

@task 
def notify(result, args_sub_2): 
    if result: 
     # something else time consuming, only do this if the first part of the chain 
     # passed a test (the chained tasks before this were 'successful' 
     notify_user(args_sub_2) 
     return True 
    return False 

Wenn Sie Ausnahmen wollen fangen Sie Rückruf als so

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery 

@celery.task 
def log_error(task_id): 
    result = celery.AsyncResult(task_id) 
    result.get(propagate=False) # make sure result written. 
    with open(os.path.join('/var/errors', task_id), 'a') as fh: 
     fh.write('--\n\n%s %s %s' % (
      task_id, result.result, result.traceback)) 
+0

Vielen Dank für die Tipps über Verknüpfungen. Obwohl das funktionieren würde, löst es mein Problem nicht. Ich möchte, dass die zweite Aufgabe niemals ausgeführt wird, wenn die erste fehlschlägt. Diese Lösung hat immer noch den Aufwand, die zweite Aufgabe jedes Mal unabhängig von den Ergebnissen der ersten Aufgabe zu starten. Ich möchte die Ausführung der Kette stoppen. – Salami

+0

Ich habe dich verstanden. Wenn die Aufgabe ausgelöst wird, wird die Ausführung der Kette gestoppt. Sein Verhalten ist standardmäßig. Sie müssen keine spezielle Entscheidung dafür suchen. –

+0

@Alexander, das Anheben der Ausnahme funktioniert NICHT richtig. "Wenn ich die Dinge ausprobiere, die ich nach [2] auskommentiert habe (Ausnahmen auslösen, Rückrufe entfernen) ... bleiben wir aus irgendeinem Grund in der join() in check_orders stecken (es bricht die Gruppe auf)." – Salami

12

Meiner Meinung nach ist dies ein gemeinsamer Anwendungsfall, der nicht genug Liebe bekommt in der Dokumentation verwenden.

Angenommen, Sie möchten eine Kette in der Mitte abbrechen, während Sie SUCCESS weiterhin als Status der abgeschlossenen Aufgaben melden und kein Fehlerprotokoll oder Ähnliches senden (andernfalls können Sie eine Ausnahme auslösen).

@app.task(bind=True) # Note that we need bind=True for self to work 
def task1(self, other_args): 
    #do_stuff 
    if end_chain: 
     self.request.callbacks = None 
     return 
    #Other stuff to do if end_chain is False 

So in Ihrem Beispiel:

@app.task(ignore_result=True, bind=True) 
def is_room_open(self, args_sub_1): 
    #something time consuming 
    if http_req_and_parse(args_sub_1): 
     # go on and do the notify task 
     return True 
    else: 
     self.request.callbacks = None 

arbeiten. Beachten Sie, dass anstelle von ignore_result=True und subtask() Sie die Verknüpfung .si() wie angegeben verwenden können, um @ Abbasov-alexander

Edited mit EAGER Modus zu arbeiten, wie @PhilipGarnero in den Kommentaren vorgeschlagen.

+1

Wenn Sie Aufgaben im EAGER-Modus ausführen, wird die Aufgabe durch den obigen Vorgang beendet. Ich ersetzte 'self.request.callbacks [:] = []' durch 'self.request.callbacks = None' und funktioniert jetzt in beiden Fällen. – PhilipGarnero

+0

Wenn es in beiden Fällen funktioniert, empfehlen wir das dann. Danke für das Kommentieren, um die Antwort zu verbessern :) – AntonioMO

+4

Anscheinend funktioniert es nicht mehr für Sellerie 4.0, aber 'self.request.chain = None' tut. http://StackOverflow.com/Questions/23793928/Sellerie-clean-way-of-Revoking-the-entire-chain-from-within-a-Task/40579984#40579984 –

6

Es ist unglaublich, da ein so üblicher Fall in keiner offiziellen Dokumentation behandelt wird.Ich hatte mit dem gleichen Problem zu bewältigen (aber shared_tasks mit bind Option, so haben wir die Sichtbarkeit von self Objekt), so schrieb ich einen benutzerdefinierten Dekorateur, die automatisch den Widerruf Griffe:

def revoke_chain_authority(a_shared_task): 
    """ 
    @see: https://gist.github.com/bloudermilk/2173940 
    @param a_shared_task: a @shared_task(bind=True) celery function. 
    @return: 
    """ 
    @wraps(a_shared_task) 
    def inner(self, *args, **kwargs): 
     try: 
      return a_shared_task(self, *args, **kwargs) 
     except RevokeChainRequested, e: 
      # Drop subsequent tasks in chain (if not EAGER mode) 
      if self.request.callbacks: 
       self.request.callbacks[:] = [] 
      return e.return_value 

    return inner 

Sie können es wie folgt :

@shared_task(bind=True) 
@revoke_chain_authority 
def apply_fetching_decision(self, latitude, longitude): 
    #... 

    if condition: 
     raise RevokeChainRequested(False) 

Siehe die vollständige Erklärung here. Hoffe es hilft!

+0

Vielen Dank. Ausgezeichnete Lösung. – kstratis

+0

Es scheint, dass jetzt die 'Callbacks'-Variable ein Tupel ist, also gibt es einen Fehler zurück, wenn man versucht, diese Operation auszuführen: ' self.request.callbacks [:] = [] '' 'Zeilenumbruch' '' TypeError: 'tuple' -Objekt unterstützt keine Artikelzuweisung' – mccc

Verwandte Themen