2017-05-16 3 views
1

abrufen Ich muss in der Lage sein, auf default_args zugreifen, die als Teil der DAG-Definition in einem Python-Operator definiert sind, python_callable. Vielleicht ist es meine Unfamiliaryy mit Python oder Luftströmung im Allgemeinen, aber könnte jemand führen, wie man das erreicht.Wie Standard-Args in Python aufrufbar

Es folgt ein Codebeispiel von dem, was

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'email': '[email protected]', 
    'email_on_failure': '[email protected]', 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    'start_date': datetime(2017, 5, 15, 23, 20), 
    'end_date': datetime(2017, 5, 16, 23, 45), 
    'touchfile_path': '/user/myname/touchfiles/', 
} 

dag = DAG(
    'test', 
    default_args=default_args, 
    template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'], 
    user_defined_macros=dict(SCHEMA_NAME='abc'), 
    #schedule_interval='*/2 * * * * ') 
    schedule_interval='@once') 

def webhdfs_touchfile_create(ds, *args, **kwargs): 
    web_hdfs_hook = WebHDFSHook('webhdfs_default') 
    client = web_hdfs_hook.get_conn() 
    client.write("/user/myname/airflow_hdfs","stringToWrite") 
    pp.pprint(kwargs) 

task1 = PythonOperator(
    task_id='task1', 
    provide_context=True, #enabling this would allow to pass arguments automatically to your callable function 
    python_callable=webhdfs_touchfile_create, 
    templates_dict={'attr1': {{ default_args['touchfile_path'] }}}, 
    dag=dag) 

Da die template_dict für PythonOperator zu erreichen versuchen, das einzige Attribut, das jinja Templating Werke ist, wie kann ich den ‚touchfile_path‘ Paramter dort abrufen?

Antwort

1

Da sie in der gleichen Datei auf der gleichen Ebene definiert sind, können Sie tun:

def webhdfs_touchfile_create(ds, *args, **kwargs): 
    web_hdfs_hook = WebHDFSHook('webhdfs_default') 
    client = web_hdfs_hook.get_conn() 
    client.write("/user/myname/airflow_hdfs","stringToWrite") 
    pp.pprint(kwargs) 
    pp.pprint(default_args['touchfile_path']) 
Verwandte Themen