2017-10-10 3 views
0

Ich lerne Airflow und habe eine einfache Frage. Unten ist meine DAG dog_retriever genanntZugriff auf die Antwort von Airflow SimpleHttpOperator GET-Anfrage

import airflow 
from airflow import DAG 
from airflow.operators.http_operator import SimpleHttpOperator 
from airflow.operators.sensors import HttpSensor 
from datetime import datetime, timedelta 
import json 



default_args = { 
    'owner': 'Loftium', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 10, 9), 
    'email': '[email protected]', 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=3), 
} 

dag = DAG('dog_retriever', 
    schedule_interval='@once', 
    default_args=default_args) 

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2 = SimpleHttpOperator(
    task_id='get_breeds', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breeds/list', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2.set_upstream(t1) 

Als Mittel Airflow zu testen, bin ich einfach machen zwei GET-Anfragen an einige Endpunkte in diesem sehr einfachen http://dog.ceo API. Das Ziel ist es, zu lernen, wie man mit einigen über Airflow abgerufenen Daten arbeitet.

Die Ausführung funktioniert - mein Code ruft erfolgreich die Enpoints in den Tasks t1 und t2 auf, ich kann sie in der Airflow-Benutzeroberfläche in der richtigen Reihenfolge protokollieren basierend auf der set_upstream Regel schrieb ich.

Was ich nicht herausfinden kann ist, wie auf die JSON-Antwort dieser beiden Aufgaben zugreifen. Es scheint so einfach, aber ich kann es nicht herausfinden. Im SimpleHtttpOperator sehe ich einen Parameter für response_check, aber nichts, um einfach die json-Antwort zu drucken, zu speichern oder anzuzeigen.

Danke.

Antwort

2

Also, da dies SimpleHttpOperator ist und die tatsächliche JSON zu XCOM geschoben wird, und Sie können es von dort bekommen. Hier ist die Zeile Code für diese Aktion: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

Was Sie tun müssen, festgelegt ist xcom_push=True, so dass Ihre erste t1 die folgende sein:

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    xcom_push=True, 
    dag=dag) 

Sie sollten in der Lage sein, alle JSON zu finden, mit return value in XCOM, mehr Details von XCOM finden Sie unter: https://airflow.incubator.apache.org/concepts.html#xcoms

+0

danke @ Chengzhi, das funktioniert. Obwohl ich denke, ich werde den PythonOperator von nun an einfach benutzen. –

Verwandte Themen