2017-02-03 3 views
0

Gibt es eine Möglichkeit, ein Befehlszeilenargument an Airflow BashOperator zu übergeben. Zur Zeit habe ich ein Python-Skript, das ein date-Argument akzeptiert und einige spezifische Aktivitäten ausführt, wie z. B. das Bereinigen bestimmter Ordner, die älter sind als das angegebene Datum.Übergeben eines Befehlszeilenarguments an den Luftstrom BashOperator

Vereinfacht Code mit nur einer Aufgabe, was würde Ich mag zu tun ist,

from __future__ import print_function 
from airflow.operators import BashOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 

default_args = { 
    'owner'    : 'airflow' 
    ,'depends_on_past' : False 
    ,'start_date'  : datetime(2017, 01, 18) 
    ,'email'   : ['[email protected]'] 
    ,'retries'   : 1 
    ,'retry_delay'  : timedelta(minutes=5) 
} 

dag = DAG(
    dag_id='data_dir_cleanup' 
    ,default_args=default_args 
    ,schedule_interval='0 13 * * *' 
    ,dagrun_timeout=timedelta(minutes=10) 
    ) 

cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log' 
     #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line) 
     #,env=env 
     ,dag=dag 
    ) 

Vielen Dank im Voraus,

Antwort

0

Versuchen:

os.system ("Sie befehligen HERE")

0

Der BashOperator ist mit Jinja2 templatiert, was bedeutet, dass Sie beliebige Werte übergeben können. In Ihrem Fall wäre es so etwas wie:

cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command="python cleanup.py --date {{ DATE }} 2>&1 >> /tmp/airflow/data_dir_cleanup.log" 
     ,params = {'DATE' : 'this-should-be-a-date'} 
     ,dag=dag 
    ) 

Siehe auch: https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja für ein breiteres Beispiel.

0

BashOperator ist Jinja templated, so können Params als Wörterbuch übergeben werden.

Airflow plant die Aufgabe und fordert Sie nicht zum param auf. Wenn Sie also sagen, dass Sie ein bestimmtes Datum als Befehlszeilenparameter übergeben müssen, ist das nicht möglich. Obwohl hat Airflow eine Vorstellung von EXECUTION DATE, die das Datum dag, auf dem geplant ausgeführt werden, und das kann in BashOperator params mit Makro übergeben werden {{ds}} oder {{ds_nodash}} (https://airflow.incubator.apache.org/code.html#macros)

env = {} 
env['DATE'] = '{{ ds }}' 
cleanup_task = BashOperator(
     task_id='task_1_data_file_cleanup' 
     ,bash_command='python cleanup.py --date $DATE 2>&1 >> /tmp/airflow/data_dir_cleanup.log' 
     ,params=env 
     ,dag=dag 
    ) 

das „DATE“ param Skript bash weitergegeben werden und kann mit $ DATE

+0

Ich habe diese Lösung versucht, aber DS wurde dort nicht gerendert Ich kann keinen Weg finden, ds als param zu übergeben !! – Omar14

0

Sie die folgenden können als jede andere bash Variable verwendet werden versuchen (arbeitete für mich):

cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'" 

t = BashOperator(
    task_id = 'some_id', 
    bash_command = cmd_command, 
    dag = your_dag_object_name) 

als ich so tat es hat die Variablen gerendert und es hat gut funktioniert. Ich glaube, es funktioniert für alle Variablen (beachten Sie, dass ich das Wort "Python" am Anfang meines Befehls gesetzt habe, weil ich ein .py Skript ausführen möchte.

Meine Aufgabe wird richtig geschrieben, um diese Variablen zu lesen als Befehlszeilenargumente (Attribut sys.argv)

Verwandte Themen