2016-08-31 3 views
3

Dies ist, was mein Code wieWie Ausnahmen innerhalb Sellerie rückgängig zu machen, wenn sqlalchemy + postgresql mit

import transaction 

@app.task(name='task_name') 
def task_name_fn(*args, **kwargs): 
    with transaction.manager: 
     try: 
      actual_fn(*args, **kwargs) 
      transaction.commit() 
     except: 
      transaction.abort() 

sieht jedoch meine transaction.abort() scheint nicht zurück rollen werden. Alle nachfolgenden Sellerieaufgaben an diesem Arbeiter scheitern. Ich erhalte den folgenden Fehler

Die Transaktion dieser Sitzung wurde aufgrund einer vorherigen Ausnahme während des Flushvorgangs zurückgesetzt. Um eine neue Transaktion mit dieser Sitzung zu beginnen, geben Sie zuerst Session.rollback() aus.

Was mache ich falsch?
Bessere Frage noch, wie würden Sie die task_name_fn schreiben, so dass dieses Problem nicht auftritt?

Antwort

4

Als erstes müssen Sie keine Ausnahmen abfangen, um die Transaktion abzubrechen.

import transaction 

@app.task(name='task_name') 
def task_name_fn(*args, **kwargs): 
    with transaction.manager: 
     actual_fn(*args, **kwargs) 

Die Transaktion wird abgebrochen, wenn eine Ausnahme auftritt.

Als nächstes könnten Sie das im Task Decorator abstrahieren. Somehing wie die (nicht getestet, aber wahrscheinlich funktionieren wie):

from functools import wraps 
import transaction 

def tm_task(f): 
    @wraps(f) 
    def decorated(*args, **kwargs): 
     with transaction.manager: 
      return f(*args, **kwargs) 
    return app.task()(decorated) 

@tm_task 
def actual_fn(*args, **kwargs): 
    pass # your function code here instead of calling other function 

Auch, weil Sie Transaktionen verwenden, können Sie die Warteschlangen Ihrer Aufträge zu verzögern, nachdem die Transaktion verpflichtet hat. Wenn Sie z. B. eine Zeile in Ihre Transaktion einfügen und einen Job in die Warteschlange stellen, der in der Zeile ausgeführt wird, kann dies im Worker vorkommen, bevor die erste Transaktion festgeschrieben wurde und die Zeile außerhalb der Transaktion noch nicht verfügbar ist. Etwas wie:

class AfterCommitTask(Task): 
    def apply_async(self, *args, **kw): 
     tx = transaction.get() 
     def hook(status): 
      if status: # Only queue if the transaction was succesfull. 
       super(AfterCommitTask, self).apply_async(*args, **kw) 
     tx.addAfterCommitHook(hook) 

def tm_task(f): 
    @wraps(f) 
    def decorated(*args, **kwargs): 
     with transaction.manager: 
      return f(*args, **kwargs) 
    return app.task(base=AfterCommitTask)(decorated) 

@tm_task 
def actual_fn(*args, **kwargs): 
    pass # your function code here instead of calling other function 
+0

Sie, mein Freund, haben mich ein paar Stunden der Forschung gerettet! Vielen Dank –

+1

Und Sie haben mich 10K SO Ruf erreicht, danke. Im Ernst, ich wünschte, jemand hätte mir das vor ein paar Jahren gesagt, anstatt es Stück für Stück zu lernen. Ich hoffe, dass andere Leute diese SO Frage + Antwort finden werden. –

+0

Bitte beachten Sie, dass dies keine Transaktionswiederholungen beantwortet, wenn eine Konfliktlösung in der SQL-Datenbank vorliegt. Das Wiederholen ist Voraussetzung, wenn Sie eine effektive Transaktionsisolationsstufe in SQL verwenden, um Race-Bedingungen zu vermeiden. Siehe hierzu ein Beispiel für einen wiederholbaren Transaktions-Handler hier https://websauna.org/docs/api/websauna.system.model.retry.html?highlight=retryable#websauna.system.model.retry.retryable oder Sellerie-Task-Basisklasse Das kann sich erneut versuchen https://websauna.org/docs/api/websauna.system.task.tasks.html?highlight=retryable#websauna.system.task.tasks.RetryableTransactionTask –

Verwandte Themen