Als ich spark-submit
für die Spart Streaming Job zu machen, kann ich sehen, dass es während approximatelly 1 Minute läuft, und es wird dann mit dem endgültigen Status SUCCEEDED
gestoppt:Spark-Streaming wird ohne Fehler nach ~ 1 Minute gestoppt
16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING)
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED)
Ich verstehe nicht, warum es gestoppt wird, während ich erwarte, dass es für eine unbestimmte Zeit läuft und durch Nachrichten aus der Kafka-Warteschlange ausgelöst wird. In Protokollen kann ich alle println
Ausgänge sehen, und es gibt keine Fehler.
Dies ist ein kurzer Auszug aus dem Code:
val conf = new SparkConf().setAppName("MYTEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")
val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
println("Dividing the topic into partitions.")
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2)
messages.foreachRDD(msg => {
msg.foreach(s => {
if (s != null) {
//val result = ... processing goes here
//println(result)
}
})
})
// Start the streaming context in the background.
ssc.start()
Das ist mein spark-submit
Befehl:
/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2
Wenn ich Resource Manager öffnen, sehe ich, dass kein Job RUNNING
ist und die Funken Streaming Job wird als FINISHED
markiert.
Ich würde versuchen, "rootLogger.setLevel (Level.ERROR)" auskommentieren, um eine ausführlichere Ausgabe zu erhalten. Wahrscheinlich wird das, was Ihren Job erledigt, nicht mit 'ERROR' gekennzeichnet, so dass es aus den Logs herausgefiltert wird. Außerdem sieht es so aus, als ob Sie am Ende Ihres Codes Ihren Aufruf an "ssc.waitTermination" verpasst haben. –
@EricM .: Ok, danke. Lass es mich ohne diese Codezeile testen. Ich werde Ihnen das Ergebnis in ein paar Minuten erzählen. – duckertito
@EricM .: Ich habe versucht, es mit 'ssc.awaitTermination' zu starten, aber es gab das gleiche Problem. Aber lass es mich trotzdem nochmal überprüfen. – duckertito