2017-04-05 2 views
0

Ich habe den folgenden Workflow erstellt, der dynamisch Aufgaben hinzugefügt hat. Aber Airflow ist nicht in der Lage, die Knoten Task, Join und zum Dag hinzuzufügen. In der graphischen Darstellung kann ich nur die START und die END Knoten sehen. Gibt es irgendetwas, was ich hier falsch mache?Problem mit der Erstellung von dynamischen Airflow-Aufgaben

Danke.

dag = DAG(
    'ddl_ver1', 
    default_args=default_args, 
    schedule_interval='*/5 * * * *' 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    qts = [] 
    for i in range(7): 
     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 
     qts.append(query) 

    return qts 



def create_data_discovery_tasks(accounts): 
    for account_id in accounts: 
     task = DummyOperator(
      task_id='ddl_task_' + str(account_id), 
      dag=dag) 

     join = DummyOperator(
      task_id='ddl_join_' + str(account_id), 
      dag=dag) 

     qts = create_qts(account_id) 

     for qt in qts: 
      qt.set_upstream(task) 
      qt.set_downstream(join) 

     task.set_upstream(START) 

     join.set_downstream(END) 

Antwort

0

create_qts ist nie wirklich die Art und Weise genannt es ist.

Sie müssen auch entweder set_downstream oder set_upstream aufrufen, wenn Sie beim Start beginnen möchten, legen Sie die generierten Aufgaben in die Mitte und enden Sie dann am Ende.

Das wird gut funktionieren (man beachte die sehr letzte Zeile):

from airflow.models import DAG 
from airflow.operators.dummy_operator import DummyOperator 
from datetime import datetime 

dag = DAG(
    'ddl_ver1', 
    schedule_interval='*/5 * * * *', 
    start_date=datetime(2017,4,30) 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    previous_qt = None 
    for i in range(7): 

     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 

     if previous_qt: 
      previous_qt.set_downstream(qt) 
     else: 
      start_node.set_downstream(qt) 

     previous_qt = qt 

    qt.set_downstream(end_node) 

create_qts(123) 
Verwandte Themen