2017-05-31 3 views
6

Ich habe einen SubDAG im Luftstrom mit einem lang andauernden Schritt (normalerweise etwa 2 Stunden, obwohl er je nach Gerät variiert). Unter 1.7.1.3 würde dieser Schritt konsistent AIRFLOW-736 verursachen, und die SubDAG würde im Zustand 'running' stehen bleiben, wenn alle Schritte erfolgreich waren. Wir könnten dies umgehen, da wir keine Schritte nach der SubDAG hatten, indem wir den SubDagOperator manuell als erfolgreich (anstatt zu laufen) in der Datenbank markiert haben.Airflow - lang andauernde Aufgabe in SubDag, die nach einer Stunde als ausgefallen markiert wurde

Wir testen Airflow 1.8.1 jetzt, Modernisierung durch folgende Maßnahmen:

  1. unsere Planer Shuting und der Arbeitskräfte
  2. Via pip, Luftstrom zu deinstallieren und die Installation von Apache-Airflow (Version 1.8.1)
  3. Runing Luftstrom upgradedb
  4. den Luftstrom Planer und Arbeiter Lauf

Mit dem System o ansonsten bleibt die gleiche DAG nun zu 100% der Zeit stehen, nachdem die lang andauernde Aufgabe die 1-Stunden-Marke erreicht hat (obwohl sie merkwürdig ist, nicht genau 3600 Sekunden später - sie kann zwischen 30 und 90 Sekunden nach einer Stunde liegen). mit der Meldung "Executor meldet Task-Instanz beendet (fehlgeschlagen), obwohl die Task die Ausführung angibt. Wurde die Aufgabe extern ausgeführt? ". Die Aufgabe selbst läuft jedoch weiterhin unvermindert auf dem Arbeiter. Irgendwie gibt es Meinungsverschiedenheiten zwischen dem Scheduler und der Tatsache, dass die Aufgabe trotz der eigentlichen Aufgabe auf der Grundlage der Datenbank fehlgeschlagen ist (siehe this line von jobs.py) läuft gut

Ich habe bestätigt, dass der Status in der task_instance-Tabelle der Luftströmungsdatenbank irgendwie "fehlgeschlagen" ist. Daher würde ich gerne wissen, was den Taskstatus als fehlgeschlagen markieren könnte selbst läuft noch

Hier ist ein Beispiel dag die das Problem löst.

from datetime import datetime 
from airflow.models import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.subdag_operator import SubDagOperator 

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)} 

def define_sub(dag, step_name, sleeptime): 
    op = BashOperator(
     task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag 
    ) 
    return dag 

def gen_sub_dag(parent_name, step_name, sleeptime): 
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS) 
    define_sub(sub, step_name, sleeptime) 
    return sub 

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None) 

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent 
) 
+0

Heute lief ich in das gleiche Problem, ein Subdag mit einer lang laufenden Aufgabe, nach etwas mehr als einer Stunde, habe ich eine Fehlermeldung erhalten. Interessanterweise versuchte der Scheduler, die Aufgabe neu zu starten, die aufgrund einer blockierten Ressource außerhalb des Luftstroms fehlschlug. Die ursprüngliche Aufgabe wurde fortgesetzt und korrekt beendet. Der Luftstrom wurde als fehlgeschlagen markiert, bevor die Aufgabe beendet wurde. –

+0

Welchen Executor verwenden Sie? Ist es Sellerie + Redis? –

Antwort

0

Wenn Sie tatsächlich mit Sellerie und Redis laufen, werfen Sie einen Blick auf die visibility timeout setting für Sellerie und erhöhen Sie sie über die erwartete Endzeit Ihrer Aufgabe hinaus.

Obwohl wir Sellerie zu Aufgaben-Ack-Late konfigurieren, hat es immer noch Probleme mit dem Verschwinden von Aufgaben. Wir betrachten dies als a bug in Sellerie.

Verwandte Themen