2017-03-23 5 views
2

abgerufen wird Ich habe die folgende DAG mit zwei SSHExecuteOperator-Tasks. Die erste Task führt eine gespeicherte Prozedur aus, die einen Parameter zurückgibt. Die zweite Aufgabe benötigt diesen Parameter als Eingabe.So rufen Sie einen Wert von Airflow XCom ab, der über SSHExecuteOperator

Könnte bitte erklären, wie man den Wert von der XCom, die in Task1 gedrückt wurde, zieht, um sie in Task2 zu verwenden?

from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.contrib.hooks.ssh_hook import SSHHook 
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator 
from airflow.models import Variable 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 0 
} 

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin 
DataQualitySSHHook = Variable.get('DataQualitySSHHook') 
print('Connecting to: ' + DataQualitySSHHook) 
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook) 
sshHookEtl.no_host_key_check = True 

#create dag 
dag = DAG(
    'ed_data_quality_test-v0.0.3', #update version whenever you change something 
    default_args=default_args, 
    schedule_interval="0 0 * * *", 
    dagrun_timeout=timedelta(hours=24), 
    max_active_runs=1) 

#create tasks 
task1 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_batch_register', 
    bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end 
    ssh_hook=sshHookEtl, 
    xcom_push=True, 
    retries=0, 
    dag=dag) 

task2 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_module_session_start', 
    bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}", 
    ssh_hook=sshHookEtl, 
    retries=0, 
    dag=dag) 

#create dependencies 
task1.set_downstream(task2) 
+0

Ihre DAG Definition scheint in Ordnung. Können Sie die DAG erfolgreich ausführen? Irgendwelche Fehler? –

Antwort

1

So ist die Lösung, die ich gefunden habe, ist, wenn task1 das Shell-Skript ausgeführt wird, müssen Sie die Parameter, die Sie durch die XCom Variable erfasst werden sicherstellen wollen, ist das letzte, was von Ihrem Skript gedruckt (echo).

Dann konnte ich die XCom Variablenwert mit dem folgenden Code-Snippet abzurufen:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}

Verwandte Themen