2015-09-15 9 views
5

Ich benutze Luigi, um eine Pipeline zu starten. Nehmen wir ein einfaches exemple nehmenKann Luigi eine Ausnahme propagieren oder ein Ergebnis zurückgeben?

task = myTask() 
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1) 
w.add(task) 
w.run() 

Lassen Sie uns jetzt sagen, dass myTask eine Ausnahme während der Ausführung erzieht. Alles, was ich haben kann, ist ein Log von Luigi, der die Ausnahme zeigt.

Gibt es eine Möglichkeit, dass Luigi es propagieren könnte oder zumindest einen failure Status zurückgeben?

Ich wäre dann in der Lage, mein Programm in Abhängigkeit von diesem Zustand reagieren zu lassen.

Danke.

BEARBEITEN Ich habe vergessen zu spezifizieren, dass luigis Ausgaben eine Datenbank zielen, wenn ich das Ergebnis speichere. Wenn eine Ausnahme ausgelöst wird, wird kein Ergebnis gespeichert, aber die Ausnahme wird nicht a luigi verbreitet. Ich habe mich gefragt, ob Luigi eine Option dafür haben könnte.

+0

Was meinen Sie propagieren es oder einen Fehlerstatus zurückgeben? Überall, wo das Programm fehlschlägt, können Sie eine except-Klausel eingeben, und dann können Sie die except-Klausel verwenden, um in die luigi-Ausgabe zu schreiben. Dann könnten Sie den Luigi-Ausgang einlesen und entsprechend reagieren. –

+0

Entschuldigung, Ihr Kommentar hat mir klar gemacht, dass ich genauer auf die Art und Weise eingehen muss, wie ich Luigi und die Ausgabe verwende. Ich bearbeite. – MathiasDesch

Antwort

10

Von docs:

Luigi hat eine integrierte Ereignissystem, das Sie Rückrufe an Veranstaltungen anmelden können und lösen Sie sie von Ihrem eigenen Aufgaben. Sie können sich in einige vordefinierte Ereignisse einklinken und eigene erstellen. Jedes Ereignis-Handle ist an eine Task-Klasse gebunden und wird nur von dieser Klasse oder einer Unterklasse davon ausgelöst. Auf diese Weise können Sie mühelos Ereignisse nur aus einer bestimmten Klasse abonnieren (z. B. für Hadoop-Jobs).

Beispiel:

import luigi 

from my_tasks import MyTask 


@MyTask.event_handler(luigi.Event.FAILURE) 
def mourn_failure(task, exception): 
    """Will be called directly after a failed execution 
    of `run` on any MyTask subclass 
    """ 

    do_something() 


luigi.run() 

Luigi hat eine lot of events you can choose from. Sie können sich auch this tests ansehen, um zu erfahren, wie Sie anderen Ereignissen zuhören und darauf reagieren können.

+0

Schön, danke! – MathiasDesch

+1

Hinzufügen der neueren Version von Luigi Handle jetzt mehr Event. Es wurde einfacher, Fehler im Prozess einer Aufgabe zu behandeln. – MathiasDesch

0

Was Sie tun könnten, ist einen Fehler in die Datei schreiben. Zum Beispiel in Ihre Aufgabe, die fehlschlagen (nennen wir es TaskA):

x="" 
try: 
    do stuff 
except: 
    x="error!" 
with open('errorfile.log','w') as f: 
    f.write(x) 

Dann in einer Aufgabe, die auf diesem Fehler abhängig ist, würde diese Aufgabe TaskA erfordern. Und man könnte so etwas tun:

with open('errorfile.log','r') as f: 
    if f.read()://if anything is in the error log from TaskA 
     //error occurred 
     do stuff 
    else: 
     do other stuff 
Verwandte Themen