2017-09-12 21 views
0

Ich bin neu in Airflow und habe meine erste DAG erstellt. Hier ist mein DAG-Code. Ich möchte, dass die DAG jetzt startet und danach einmal am Tag rennt.Airflow DAG wird nicht geplant

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

dag = DAG(
    'alamode', default_args=default_args, schedule_interval=timedelta(1)) 

create_command = "/home/ubuntu/scripts/makedir.sh " 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command=create_command, 
    dag=dag) 

run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh " 
# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id='web_scrawl', 
    bash_command=run_spiders, 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1) 

Die DAG wird nicht vom Airflow ausgewählt. Ich überprüfte das Protokoll und hier ist, was es sagt.

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue 
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode 
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 
    'strategies for improved performance.' % expr) 
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat 
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105 
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds 

Was genau mache ich falsch? Ich habe versucht, schedule_interval zu schedule_interval = timedelta (minutes = 1) zu ändern, um zu sehen, ob es sofort startet, aber immer noch keine Verwendung. Ich kann die Aufgaben unter der DAG wie erwartet in der Airflow-Benutzeroberfläche sehen, aber mit dem Zeitplanstatus "Kein Status". Bitte hilf mir hier.

+0

Haben Sie auf der DAG gedreht UI einschalten? – Chengzhi

+0

Ja, die Schaltfläche ist aktiviert. Trotzdem wird es nicht abgeholt. – Anju

+0

Haben Sie den Airflow Worker und den Airflow Scheduler aufgerufen? –

Antwort

0

Dieses Problem wurde anhand der folgenden Schritte gelöst:

1) Ich habe Timedelta ein viel älteres Datum für start_date und schedule_interval = (Minuten = 10). Verwendet auch ein reales Datum anstelle von datetime.now().
2) Catchup = True in DAG-Argumenten hinzugefügt.
3) Setup-Umgebungsvariable als Export AIRFLOW_HOME = pwd/airflow_home.
4) Gelöschter Luftstrom.db
5) Den neuen Code in den DAGS-Ordner verschoben
6) Der Befehl 'airflow initdb' wurde ausgeführt, um die Datenbank erneut zu erstellen.
7) Bogen um die 'ON' Schalter meiner DAG durch UI
8) Ran den Befehl 'Luftstrom-Scheduler'

Hier ist der Code, der jetzt funktioniert:

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 9, 12), 
    'email': ['[email protected]'], 
    'retries': 0, 
    'retry_delay': timedelta(minutes=15) 
} 

dag = DAG(
    'alamode', catchup=False, default_args=default_args, schedule_interval="@daily") 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command='/home/ubuntu/scripts/makedir.sh ', 
    dag=dag) 


# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id= 'web_crawl', 
    bash_command='/home/ubuntu/scripts/crawl_spiders.sh ', 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1) 
Verwandte Themen