2017-12-24 17 views
5

In Airflow, ich bin mit dem Problem konfrontiert, dass ich die job_flow_id zu einem meiner emr-Schritte übergeben muss. Ich bin in der Lage, die job_flow_id vom Operator abzurufen, aber wenn ich die Schritte zum Übergeben an den Cluster erstellen möchte, ist der task_instance Wert nicht richtig. Ich habe den folgenden Code:Airflow - Task Instanz im EMR-Operator

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

Das Problem ist, dass, wenn ich die EMR überprüfen, anstatt die --cluster-id j-1234 im load_data Schritt zu sehen, ich sehe --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", die mein Schritt führt zum Scheitern verurteilt.

Wie kann ich den tatsächlichen Wert innerhalb meiner Schrittfunktion erhalten?

Dank und schön Ferien

+0

haben Sie versucht, den Wert ohne Anführungszeichen anzuhängen? load_data_steps [0] ["HadoopJarStep"] ["Args"]. append ( {{task_instance.xcom_pull ('create_job_flow', Schlüssel = 'return_value')}}) –

+0

Woher bekomme ich '' 'task_instance''' Objekt von? Ich lerne immer noch, wie man es benutzt. – davideberdin

Antwort

3

finde ich heraus, dass PR dort auf Luftstrom-Repository ist über this. Das Problem ist, dass es für die Schritte in der EmrAddStepsOperator keine Vorlagen gibt. Um dieses Problem zu überwinden, habe ich folgende:

  • Erstellt ein benutzerdefinierten Operator, der von EmrAddStepsOperator
  • hinzugefügt diesem Operator als Plugin erbt
  • Genannt der neu Operator in meinem DAG-Datei

Hier der Code für den benutzerdefinierten Operator und das Plugin in der Datei custom_emr_add_step_operator.py (siehe Baum unten)

from __future__ import division, absolute_import, print_function 

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils import apply_defaults 

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 


class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 

    @apply_defaults 
    def __init__(
      self, 
      *args, **kwargs): 
     super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     super(CustomEmrAddStepsOperator, self).execute(context=context) 


# Defining the plugin class 
class CustomPlugin(AirflowPlugin): 
    name = "custom_plugin" 
    operators = [CustomEmrAddStepsOperator] 

In meiner DAG Datei mit dem Namen ich das Plugin auf diese Weise

from airflow.operators import CustomEmrAddStepsOperator 

Die Struktur meines Projekts und Plugins sieht wie folgt aus:

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

Wenn Sie eine IDE wie PyCharm verwenden, dieser Wille beschweren, weil es sagt, dass es das Modul nicht finden kann. Aber wenn Sie Airflow ausführen, wird dieses Problem nicht angezeigt. Denken Sie auch daran, dass Sie in Ihrem Ordner airflow.cfg auf den richtigen Ordner plugins zeigen, damit Airflow Ihr neu erstelltes Plugin lesen kann.