Ich habe den folgenden Workflow erstellt, der dynamisch Aufgaben hinzugefügt hat. Aber Airflow ist nicht in der Lage, die Knoten Task
, Join
und zum Dag hinzuzufügen. In der graphischen Darstellung kann ich nur die START
und die END
Knoten sehen. Gibt es irgendetwas, was ich hier falsch mache?Problem mit der Erstellung von dynamischen Airflow-Aufgaben
Danke.
dag = DAG(
'ddl_ver1',
default_args=default_args,
schedule_interval='*/5 * * * *'
)
start_node = DummyOperator(task_id='ddl_start',
dag=dag)
end_node = DummyOperator(task_id='ddl_finish',
dag=dag)
def create_qts(account_id):
qts = []
for i in range(7):
qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i),
dag=dag)
qts.append(query)
return qts
def create_data_discovery_tasks(accounts):
for account_id in accounts:
task = DummyOperator(
task_id='ddl_task_' + str(account_id),
dag=dag)
join = DummyOperator(
task_id='ddl_join_' + str(account_id),
dag=dag)
qts = create_qts(account_id)
for qt in qts:
qt.set_upstream(task)
qt.set_downstream(join)
task.set_upstream(START)
join.set_downstream(END)