2017-09-15 3 views
0

Ich bin neu im Luftstrom. In meiner Firma für ETL-Pipeline verwenden wir derzeit Crontab und benutzerdefinierte Scheduler (entwickelt in-house). Jetzt planen wir Apache Luftstrom für unsere alle Data Pipe zu implementieren- Linienszenarien .Für das beim Erkunden der Funktionen nicht in der Lage zu finden unique_id für jede Task Instances/Dag .Wenn ich suchte die meisten Lösungen endete in Makros und Vorlage .Aber keine von ihnen bieten keine uniqueID für eine Aufgabe. Aber ich bin in der Lage, inkrementelle uniqueID in der Benutzeroberfläche für jede Aufgaben zu sehen. Gibt es eine Möglichkeit, einfach auf diese Variablen innerhalb meiner Python-Methode zugreifen. Der Hauptanwendungsfall ist, dass ich diese IDs als Parameter aus Python/Ruby/Pentaho übergeben müssen Jobs, die als Skripte/Methoden bezeichnet werden.Getting unique_id für Apache Luftströmungsaufgaben

Für Beispiel

mein Shell-Skript ‚test.sh‘ müssen zwei Argumente man RUN_ID und andere objektgruppen wird. Derzeit erzeugen wir diese einzigartige RUN_ID von einer zentralen Datenbank und an den Arbeitsplätzen vorbei .Wenn es bereits im Luftstrom Zusammenhang wir verwenden werden, dass

from airflow.operators.bash_operator import BashOperator 
from datetime import date, datetime, timedelta 
from airflow import DAG 

shell_command = "/data2/test.sh -r run_id -c collection_id" 


putfiles_s3 = BashOperator(
       task_id='putfiles_s3', 
       bash_command=shell_command, 
       dag=dag) 

für einen einzigartigen RUN_ID Suchen (Entweder Dag Ebene/Aufgabenebene) für jeden Lauf während der Ausführung dieses Dag (geplant/manuell)

Hinweis: Dies ist eine Beispielaufgabe. Es wird mehrere abhängige Aufgabe zu diesem Dag geben. Ansetztechnologie JOB_ID Screenshot von Luftstrom UI enter image description here

Dank Anoop R

+0

Geben Sie Ihren Code ein –

+0

Haben Sie sich UUID angesehen? https://stackoverflow.com/questions/534839/how-to-create-a-guid-uuid-in-python#534851 –

+0

@MicahElliott Danke für Ihren Vorschlag. Wir können zufällige ID wie diese oder Shell zufällige Befehl generieren. Ich suchte nach einer ID, die vom Luftstrom selbst erzeugt wurde, genau wie job_id. Einen Screenshot der Airflow-Benutzeroberfläche als Referenz anhängen. –

Antwort

1

{{ ti.job_id }} ist, was Sie wollen:

from datetime import datetime, timedelta 
from airflow.operators.bash_operator import BashOperator 
from airflow import DAG 


dag = DAG(
    "job_id", 
    start_date=datetime(2018, 1, 1), 
) 

with dag: 
    BashOperator(
     task_id='unique_id', 
     bash_command="echo {{ ti.job_id }}", 
    ) 

Das zur Laufzeit gültig. Ein Protokoll dieser Ausführung wie folgt aussieht:

[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj 
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4 
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output: 
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4 

Beachten Sie, dass dies nur zur Laufzeit gültig sein wird, so dass die „Übertragene Template“ Blick in die webui Keine statt einer Nummer zeigen.

+0

{{ti.job_id}} kann ich mit beliebigen Operatoren verwenden und kann als Argumente an die Python-Methode übergeben auch richtig? Wenn es Ihnen nichts ausmacht Kannst du mir ein Beispiel zeigen, um diesen Wert an eine Python-Methode zu übergeben Danke Ash Berlin- Taylor. –

+0

bekam ich die Lösung, die die gleiche Python-Methode def test_failure (** kwargs) für das Bestehen: print 'Instanzvariablen aus dem Kontext Zugriff auf' ti = kwargs [ 'ti'] Druck ti.job_id @Ash Ist dort irgendwelche Dokumente, die geben, was alle Werte sind, die durch "task_instance" verfügbar sind. Diese URL erklärt nicht viel über "ti" https://pythonhosted.org/airflow/code.html#macros –

+0

nicht im Augenblick, nein. "ti" ist ein TaskInstance https://pythonhosted.org/airflow/code.html#airflow.models.TaskInstance, aber das dokumentiert keine der Eigenschaften dieses Objekts, also gehe ich zum Code. –

Verwandte Themen