2017-06-13 5 views
1

kann ich Makros mit dem PythonOperator verwenden? Ich habe versucht, zu folgen, aber ich war nicht in der Lage, die Makros gerendert zu bekommen!Makros im Airflow Python-Operator

dag = DAG(
    'temp', 
    default_args=default_args, 
    description='temp dag', 
    schedule_interval=timedelta(days=1)) 

def temp_def(a, b, **kwargs): 
    print '{{ds}}' 
    print '{{execution_date}}' 
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) 

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = PythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 

Antwort

7

Makros werden nur für Template-Felder verarbeitet. Um Jinja dazu zu bringen, dieses Feld zu bearbeiten, erweitern Sie die PythonOperator um Ihre eigenen.

class MyPythonOperator(PythonOperator): 
    template_fields = ('templates_dict','op_args') 

Ich habe 'templates_dict' zum template_fields weil die PythonOperator selbst dieses Feld hat Templat: PythonOperator

Jetzt sollten Sie in der Lage sein, einen Makro in diesem Feld zu verwenden:

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 
+1

können wir dies als die richtige Antwort markieren? Weil es die richtige Antwort ist –

+1

Aus Gründen der Abwärtskompatibilität können Sie 'template_fields' folgendermaßen überschreiben:' template_fields = PythonOperator.template_fields + ('op_args',) '. BTW, ich öffnete eine [JIRA, um 'op_args' und' op_kwargs' zu 'PythonOperator' Vorlagenfeldern hinzuzufügen] (https://issues.apache.org/jira/browse/AIRFLOW-1814) –

1

In meinem Eine native Airflow-Methode wäre es, den mitgelieferten PythonOperator zu verwenden und den -Parameter als solchen zu verwenden.

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    provide_context=True, 
    dag=dag) 

Jetzt haben Sie Zugriff auf alle Makros, Luftstrom-Metadaten und Task-Parameter in der kwargs Ihrer aufrufbar

def temp_def(**kwargs): 
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date'])) 

Wenn Sie hatte einige benutzerdefinierte params mit der Aufgabe zugeordnet ist definiert Sie diejenigen zugreifen können auch über kwargs['params']

+0

Dies ist wahrscheinlich der bessere Weg es tun. Meine Antwort war hauptsächlich auf die spezifische Frage gerichtet, warum die Makros nicht verarbeitet wurden. – jhnclvr