2017-07-26 19 views
0

Wir haben vor kurzem versucht, Airflow als unsere "Datenworkflow" Engine zu übernehmen, und während ich die meisten Dinge herausgefunden habe, bin ich immer noch im grauen Bereich, wie der Scheduler berechnet, wann DAGs ausgelöst werden sollen.Airflow DAG Triggerung

Werfen Sie einen Blick auf diese einfache dag:

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past':  False,  
      'start_date':   datetime.now() 
} 

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task1', 
       bash_command='date',     
       dag=dag)  

Der Zeitplan wird diese abholen, aber wird es nicht ausführen. Nun, wenn ich die "start_date" ändern:

datetime(year=xxxx,month=yyyy=day=zzzz) 

wo xxxx, yyyy, zzzz heutigen Datum sind, beginnen sie ausgeführt wird. Der Grund dafür ist, dass der Scheduler diese dags immer wieder aus dem Quell-dag-Ordner liest und dabei jedesmal datetime.now() ausführt, wobei das Startdatum anders ist als das derzeitig eingereihte, dieses dag wieder hinzufügt und daher neu plant/Drängen Sie das Ausführungsdatum vorwärts (mein dag_dir_list_interval ist 300)?

Auch in Luftstrom, wie ich es verstehe, wenn ein dag un-Pause geschaltet ist (oder mit dags_are_paused_at_creation = False hinzugefügt), wird der Scheduler die Ausführung planen wie folgt:

  • 1. dag Ausführung: instant nach (+ start_date Intervall)
  • dag 2. Durchführung: Zeitpunkt nach (start_date + (Intervall * 2))
  • dag 3. Durchführung: Zeitpunkt nach (start_date + (Intervall * 3))

Ist das richtig angenommen?

UPDATE (7/30/2017)

Basierend auf der Annahme, oben, ich diese dag heute erstellt (2017.07.30):

from airflow import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 

dag_options = {     
      'owner':    'Airflow', 
      'depends_on_past': False,  
      'start_date': 
datetime(year=2017,month=7,day=30,hour=20,minute=10) 
} 

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag: 
       task1 = BashOperator(  
       task_id='task_100', 
       bash_command='date',     
       dag=dag)  

die beginnen zu müssen (UTC):

  • 7/30/2017 20:20:00
  • 7/30/2017 20:30:00
  • 30.07.2017 20:40:00

Leider geschieht dies nicht. Hier sind einige Screenshots von meinem Armaturenbrett:

Kann jemand erklären, warum auf 20.21.00 die dag wurde nicht ausgeführt? nach 20:31:00 hat es immer noch nicht ausgeführt ... Was fehlt mir hier?

Übrigens, ich bemerkte auch, dass aus irgendeinem Grund, dass jedes Mal, wenn ich gehe und einen dag manuell durch das Armaturenbrett starten, es gerade in der "running" Bühne sitzt. Warum ist das? Hat das manuelle Ausschalten etwas mit den Start-Timing-Optionen zu tun (Startdatum/Intervall/etc)?

Vielen Dank für alle Präzisierungen Sie richtig

+0

Zeitplan Intervall ist crontab, können Sie versuchen, https://contab.guru/ zu testen, welches Intervall für Sie bedeutet. Wenn Sie 1.8 + verwenden, wird datetime.now() nicht als bewährte Methode betrachtet. Weitere Informationen finden Sie hier https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving- scheduler-on-dynamic-startdatum – Chengzhi

Antwort

1

Ihre Annahmen zur Verfügung stellen kann. Airflow plant den ersten DAG-Lauf nach Ablauf des angegebenen Zeitplanintervalls ab dem Startdatum. Die Verwendung von datetime.now() als Startdatum führt dazu, dass Airflow selten oder überhaupt eine DAG auslöst. Es wird in den Planungsdokumenten erwähnt.

Wenn Sie ein bestimmtes Startdatum angeben möchten, z. B. datetime (2017,7,27,1,0) mit einem Zeitplanintervall von "5 * * * *", dann am 07/27 um 01:05 Uhr Die DAG wird zum ersten Mal ausgelöst. Es wird danach alle fünf Minuten weiterlaufen.

Verwandte Themen