Hallo Leute der Erde! Ich verwende Airflow, um Spark-Aufgaben zu planen und auszuführen. Alles, was ich zu diesem Zeitpunkt gefunden habe, sind Python-DAGs, die Airflow verwalten kann.
DAG Beispiel:So führen Sie den Spark-Code im Airflow aus?
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
Das Problem ist, ich in Python-Code nicht gut bin und einige Aufgaben in Java geschrieben haben. Meine Frage ist, wie Spark Java jar in Python DAG ausführen? Oder vielleicht gibt es einen anderen Weg, es zu tun? Ich habe einen Funken gefunden: http://spark.apache.org/docs/latest/submitting-applications.html
Aber ich weiß nicht, wie man alles miteinander verbindet. Vielleicht hat jemand es vorher benutzt und hat ein funktionierendes Beispiel. Vielen Dank für Ihre Zeit!
Der SparkSQLOperator sieht aus, als wäre es genau das, was ich brauche - aber ich kann es nicht zum Laufen bringen, weil ich nicht weiß, wie die Verbindungszeichenfolge aussehen soll - gibt es irgendwelche Unterlagen, die mir dabei helfen können? –
Wenn Sie es nicht festlegen - die Verbindung wird standardmäßig auf den Garnausführungsmodus gesetzt - siehe https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33 – Tagar