2016-12-21 13 views
2

Ich versuche dynamische Sequenz ETL-Jobs einzurichten, die XCOM verwenden werden, um Daten von der ersten ausgeführten Task abzurufen. Hier ist der aktuelle Code:Airflow XCOM KeyError: 'task_instance'

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime as dt, timedelta as td, date 
from airflow.models import BaseOperator 
from airflow.operators.sensors import ExternalTaskSensor 
from airflow.operators.dummy_operator import DummyOperator 
from airflow.operators.python_operator import PythonOperator 
from airflow.models import Variable 

START_DT = dt.combine(dt.today(), dt.min.time()) 
END_DT = dt.combine(dt.today(), dt.max.time()) 
NOW = dt.now() 
CURRENT_EXEC = '{{ execution_date }}' 
TODAY_MD = dt.today().strftime("%m%d") 

def datetime_range(start, end, delta): 
    """Generates the date range with time separation""" 
    current = start 
    if not isinstance(delta, td): 
      delta = td(**delta) 
    while current < end: 
     yield current 
     current += delta 

default_args = { 
     'owner': 'test', 
     'depends_on_past': False, 
     'start_date': START_DT, 
     'email': ['[email protected]'], 
     'email_on_failure': False, 
     'email_on_retry': False, 
     'queue': 'etl', 
     'retries': 1, 
     'retry_delay': td(minutes=1), 
} 

dag_name = 'SEQ_TEST_01' 

dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30)) 

def seq_job(sq_dt, **kwargs): 
    for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1): 
     if sq_dt < str(dt_in): 
      curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59) 
      sequence = int(curr_seq[0]) 
      return sequence 

pycall = PythonOperator(
    task_id='seq_sensor', 
    provide_context=True, 
    python_callable=seq_job, 
    op_kwargs={'sq_dt': CURRENT_EXEC}, 
    dag=dag) 

def group(grp, **context): 
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor') 
    grp = '%0.2d' % grp 
    database = 'TEST' 
    today_date = '{{ ds_nodash }}' 
    return BashOperator(
      task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap), 
      bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp), 
      dag=dag) 

complete = DummyOperator(
     task_id='All_Sequences_complete', 
     dag=dag) 

pycall >> group(1) >> complete 
pycall >> group(2) >> complete 
pycall >> group(3) >> complete 

Problem ist, dass egal, was ich versuche, halte ich diesen Fehler:

Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file 
    m = imp.load_source(mod_name, filepath) 
    File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module> 
    pycall >> group(1) >> complete 
    File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group 
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor') 
KeyError: 'task_instance' 

nicht sicher, ob es etwas klein mir fehlt, oder wenn ich alles falsch. Immer noch neu im Luftstrom und versuchen, unseren ETL-Test env so einzurichten, dass er alle 30 Minuten mit einer eindeutigen Sequenznummer läuft, die von datetime_range generiert wird und auf der Variable execution_date basiert.

Antwort

2

Ich löste es durch die Bash-Operator auf eine andere Funktion zu bewegen, und ziehen Sie die Daten aus Python Bediener über:

def bash_out(group, **kwargs): 
     sequence = "{{ task_instance.xcom_pull(task_ids='seq_sensor') }}" 
     return BashOperator(task_id='ETL_{}_GRP{}'.format(database, group), bash_command='script.sh {} {} {} {}'.format(today_date, database, sequence, group), dag=dag) 

und die Abhängigkeiten Einstellung:

pycall >> [bash_out('01'), bash_out('02'), bash_out('03')] >> complete 
1

Verwenden Sie stattdessen context['ti'].

+0

versucht, und immer noch nicht gehen :( –