2017-07-12 3 views
1

Gibt es eine Möglichkeit, das Airflow-Makro außerhalb von Bedienern zu verwenden?Luftstrom: Verwenden des Makros außerhalb der Bediener

Zum Beispiel in der DAG habe ich eine Aktion:

datestamp = '{{ ds }}' 

print(datestamp) # prints string not the date when I run it for any date 

scanner = S3KeySensor(
     task_id='scanner', 
     poke_interval=60, 
     timeout=24 * 60 * 60, 
     soft_fail=False, 
     wildcard_match=True, 
     bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date 
     bucket_name=bucketName, 
     dag=dag) 

Also, wenn Scanner Aufruf „ds“ Wert mit Ausführungsdatum ersetzt wird, die erwartet wird, aber ich will „ds“ Wert verwenden, in einige andere Orte. Aber in diesem Fall ersetzt es nicht den Wert, stattdessen erhält es den gesamten String als "{{ds}}". Im obigen Beispiel. print-Anweisung druckt "{{ds}}" nicht das Ausführungsdatum.

Antwort

-2

Verwenden Sie doppelte Anführungszeichen.

datestamp = "{{ ds }}" 
print datestamp 
1

Glück für Sie bucket_key ist templated, legen Sie einfach die Jinja Vorlage hinein.

Vollständig außerhalb eines Operators können Sie diese Makros nicht verwenden. Weil die Datei regelmäßig vom Scheduler interpretiert wird und nicht nur während eines dag-Laufs. Was wäre also der Wert von ds wenn der dag nicht läuft?

Da Sie jedoch außerhalb der Aufgaben wahrscheinlich nichts damit zu tun haben möchten, könnten Sie es in ein Vorlagenfeld einfügen. Sie können auch ein anderes Feld erweitern, um es zu templatieren.

class MySensor(S3KeySensor): 
    template_fields = ('bucket_key', 'bucket_name', 'my_thing') 

    def __init__(self, my_thing=None, *args, **kwargs): 
     super(MySensor, self).__init__(*args, **kwargs) 
     self.my_tyhing = my_thing 

    def post_execute(self, context): 
     logging.info(
      "I probably wanted to over-ride poke to use {}".format(my_thing) 

scanner = MySensor(
    my_thing='{{ ds }}', 
    task_id='scanner', 
    poke_interval=60, 
    timeout=24 * 60 * 60, 
    soft_fail=False, 
    wildcard_match=True, 
    bucket_key=getPath() + '{{ ds }}', 
    bucket_name=bucketName, 
    dag=dag) 
Verwandte Themen