2016-10-03 5 views
10

Hallo Leute der Erde! Ich verwende Airflow, um Spark-Aufgaben zu planen und auszuführen. Alles, was ich zu diesem Zeitpunkt gefunden habe, sind Python-DAGs, die Airflow verwalten kann.
DAG Beispiel:So führen Sie den Spark-Code im Airflow aus?

spark_count_lines.py 
import logging 

from airflow import DAG 
from airflow.operators import PythonOperator 

from datetime import datetime 

args = { 
    'owner': 'airflow' 
    , 'start_date': datetime(2016, 4, 17) 
    , 'provide_context': True 
} 

dag = DAG(
    'spark_count_lines' 
    , start_date = datetime(2016, 4, 17) 
    , schedule_interval = '@hourly' 
    , default_args = args 
) 

def run_spark(**kwargs): 
    import pyspark 
    sc = pyspark.SparkContext() 
    df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt') 
    logging.info('Number of lines in people.txt = {0}'.format(df.count())) 
    sc.stop() 

t_main = PythonOperator(
    task_id = 'call_spark' 
    , dag = dag 
    , python_callable = run_spark 
) 

Das Problem ist, ich in Python-Code nicht gut bin und einige Aufgaben in Java geschrieben haben. Meine Frage ist, wie Spark Java jar in Python DAG ausführen? Oder vielleicht gibt es einen anderen Weg, es zu tun? Ich habe einen Funken gefunden: http://spark.apache.org/docs/latest/submitting-applications.html
Aber ich weiß nicht, wie man alles miteinander verbindet. Vielleicht hat jemand es vorher benutzt und hat ein funktionierendes Beispiel. Vielen Dank für Ihre Zeit!

Antwort

9

Sie sollten BashOperator verwenden können. Halten Sie den Rest des Codes wie es ist, Import erforderlich Klasse und Systempakete:

from airflow.operators.bash_operator import BashOperator 

import os 
import sys 

Satz erforderlichen Pfade:

os.environ['SPARK_HOME'] = '/path/to/spark/root' 
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin')) 

und fügen Betreiber:

spark_task = BashOperator(
    task_id='spark_java', 
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}', 
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'}, 
    dag=dag 
) 

Sie leicht diese verlängern um zusätzliche Argumente mithilfe von Jinja-Vorlagen bereitzustellen.

Sie können dies natürlich anpassen für Nicht-Spark-Szenario durch bash_command mit einer Vorlage geeignet in Ihrem Fall, zum Beispiel zu ersetzen:

bash_command = 'java -jar {{ params.jar }}' 

und Einstellen params.

6

Airflow ab Version 1.8 (veröffentlicht heute), hat

SparkSQLHook Code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook Code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

Beachten Sie, dass diese beiden neuen Spark-Betreiber/Haken in "contrib" Zweig sind ab 1.8 Version also nicht (gut) dokumentiert.

So können Sie SparkSubmitOperator verwenden, um Ihren Java-Code für Spark-Ausführung einzureichen.

+0

Der SparkSQLOperator sieht aus, als wäre es genau das, was ich brauche - aber ich kann es nicht zum Laufen bringen, weil ich nicht weiß, wie die Verbindungszeichenfolge aussehen soll - gibt es irgendwelche Unterlagen, die mir dabei helfen können? –

+0

Wenn Sie es nicht festlegen - die Verbindung wird standardmäßig auf den Garnausführungsmodus gesetzt - siehe https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33 – Tagar

Verwandte Themen