2016-11-16 2 views
2

Als ich spark-submit für die Spart Streaming Job zu machen, kann ich sehen, dass es während approximatelly 1 Minute läuft, und es wird dann mit dem endgültigen Status SUCCEEDED gestoppt:Spark-Streaming wird ohne Fehler nach ~ 1 Minute gestoppt

16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING) 
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED) 

Ich verstehe nicht, warum es gestoppt wird, während ich erwarte, dass es für eine unbestimmte Zeit läuft und durch Nachrichten aus der Kafka-Warteschlange ausgelöst wird. In Protokollen kann ich alle println Ausgänge sehen, und es gibt keine Fehler.

Dies ist ein kurzer Auszug aus dem Code:

val conf = new SparkConf().setAppName("MYTEST") 
val sc = new SparkContext(conf) 
sc.setCheckpointDir("~/checkpointDir") 

val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds)) 

val rootLogger = Logger.getRootLogger() 
rootLogger.setLevel(Level.ERROR) 

println("Dividing the topic into partitions.") 
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap 
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2) 

messages.foreachRDD(msg => { 
    msg.foreach(s => { 
    if (s != null) { 
     //val result = ... processing goes here 
     //println(result) 
    } 
    }) 
}) 

// Start the streaming context in the background. 
ssc.start() 

Das ist mein spark-submit Befehl:

/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \ 
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2 

Wenn ich Resource Manager öffnen, sehe ich, dass kein Job RUNNING ist und die Funken Streaming Job wird als FINISHED markiert.

+0

Ich würde versuchen, "rootLogger.setLevel (Level.ERROR)" auskommentieren, um eine ausführlichere Ausgabe zu erhalten. Wahrscheinlich wird das, was Ihren Job erledigt, nicht mit 'ERROR' gekennzeichnet, so dass es aus den Logs herausgefiltert wird. Außerdem sieht es so aus, als ob Sie am Ende Ihres Codes Ihren Aufruf an "ssc.waitTermination" verpasst haben. –

+0

@EricM .: Ok, danke. Lass es mich ohne diese Codezeile testen. Ich werde Ihnen das Ergebnis in ein paar Minuten erzählen. – duckertito

+0

@EricM .: Ich habe versucht, es mit 'ssc.awaitTermination' zu starten, aber es gab das gleiche Problem. Aber lass es mich trotzdem nochmal überprüfen. – duckertito

Antwort

0

In Ihrem Code fehlt ein Anruf an ssc.awaitTermination, um den Treiber-Thread zu blockieren.

Leider gibt es keine einfache Möglichkeit, die Ausdrucke innerhalb Ihrer map-Funktion auf der Konsole zu sehen, da diese Funktionsaufrufe in YARN-Executoren stattfinden. Cloudera Manager bietet jedoch einen anständigen Einblick in die Protokolle, und wenn Sie sie wirklich benötigen, können Sie auf einen Speicherort in HDFS schreiben und dann die verschiedenen Protokolle von dort selbst abkratzen. Wenn die Informationen, die Sie verfolgen möchten, rein numerisch sind, können Sie auch eine Accumulator verwenden.