Wir haben vor kurzem versucht, Airflow als unsere "Datenworkflow" Engine zu übernehmen, und während ich die meisten Dinge herausgefunden habe, bin ich immer noch im grauen Bereich, wie der Scheduler berechnet, wann DAGs ausgelöst werden sollen.Airflow DAG Triggerung
Werfen Sie einen Blick auf diese einfache dag:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now()
}
with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='date',
dag=dag)
Der Zeitplan wird diese abholen, aber wird es nicht ausführen. Nun, wenn ich die "start_date" ändern:
datetime(year=xxxx,month=yyyy=day=zzzz)
wo xxxx, yyyy, zzzz heutigen Datum sind, beginnen sie ausgeführt wird. Der Grund dafür ist, dass der Scheduler diese dags immer wieder aus dem Quell-dag-Ordner liest und dabei jedesmal datetime.now() ausführt, wobei das Startdatum anders ist als das derzeitig eingereihte, dieses dag wieder hinzufügt und daher neu plant/Drängen Sie das Ausführungsdatum vorwärts (mein dag_dir_list_interval ist 300)?
Auch in Luftstrom, wie ich es verstehe, wenn ein dag un-Pause geschaltet ist (oder mit dags_are_paused_at_creation = False hinzugefügt), wird der Scheduler die Ausführung planen wie folgt:
- 1. dag Ausführung: instant nach (+ start_date Intervall)
- dag 2. Durchführung: Zeitpunkt nach (start_date + (Intervall * 2))
- dag 3. Durchführung: Zeitpunkt nach (start_date + (Intervall * 3))
Ist das richtig angenommen?
UPDATE (7/30/2017)
Basierend auf der Annahme, oben, ich diese dag heute erstellt (2017.07.30):
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date':
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}
with DAG('test_dag_100', schedule_interval="*/10 * * * *",
default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task_100',
bash_command='date',
dag=dag)
die beginnen zu müssen (UTC):
- 7/30/2017 20:20:00
- 7/30/2017 20:30:00
- 30.07.2017 20:40:00
Leider geschieht dies nicht. Hier sind einige Screenshots von meinem Armaturenbrett:
Kann jemand erklären, warum auf 20.21.00 die dag wurde nicht ausgeführt? nach 20:31:00 hat es immer noch nicht ausgeführt ... Was fehlt mir hier?
Übrigens, ich bemerkte auch, dass aus irgendeinem Grund, dass jedes Mal, wenn ich gehe und einen dag manuell durch das Armaturenbrett starten, es gerade in der "running" Bühne sitzt. Warum ist das? Hat das manuelle Ausschalten etwas mit den Start-Timing-Optionen zu tun (Startdatum/Intervall/etc)?
Vielen Dank für alle Präzisierungen Sie richtig
Zeitplan Intervall ist crontab, können Sie versuchen, https://contab.guru/ zu testen, welches Intervall für Sie bedeutet. Wenn Sie 1.8 + verwenden, wird datetime.now() nicht als bewährte Methode betrachtet. Weitere Informationen finden Sie hier https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving- scheduler-on-dynamic-startdatum – Chengzhi