2015-09-28 8 views
6

Wenn spokeJob auf einem Cluster über eine bestimmte Datengröße (~ 2,5GB) ausgeführt wird, bekomme ich entweder "Job abgebrochen, weil SparkContext heruntergefahren wurde" oder "Executor verloren ". Wenn ich Garngu ansehe, sehe ich, dass der Job, der getötet wurde, erfolgreich war. Es gibt keine Probleme beim Ausführen von Daten mit 500 MB. Ich habe nach einer Lösung gesucht und festgestellt, dass: - "scheint Garn tötet einige der Executoren, wie sie mehr Speicher anfordern als erwartet.""sparkContext wurde heruntergefahren", während Funke auf einem großen Dataset ausgeführt wurde

Irgendwelche Vorschläge, wie man es debuggt?

Befehl, den ich mit meinem Funkenjob übergeben:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments 

und sparkContext Einstellungen

val sparkConf = (new SparkConf() 
    .set("spark.driver.maxResultSize", "21g") 
    .set("spark.akka.frameSize", "2011") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.dir", configVar.sparkLogDir) 
    ) 

Vereinfachte Code,

sehen aus wie der

val hc = new org.apache.spark.sql.hive.HiveContext(sc) 
val broadcastParser = sc.broadcast(new Parser()) 

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles) 
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser)) 

val allWords= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .count 

val wordQuantiles= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) 
    .map(pair => (pair._2 , pair._2)) 
    .reduceByKey(_+_) 
    .sortBy(_._1) 
    .collect 
    .scanLeft((0,0.0)) ((res,add) => (add._1, res._2+add._2)) 
    .map(entry => (entry._1,entry._2/allWords)) 

val dictionary = featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // here I have Rdd of word,count tuples 
    .filter(_._2 >= moreThan) 
    .filter(_._2 <= lessThan) 
    .filter(_._1.trim!=("")) 
    .map(_._1) 
    .zipWithIndex 
    .collect 
    .toMap 

Und Fehler-Stack nicht

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) 
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511) 
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435) 
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715) 
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) 
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714) 
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
at org.apache.spark.rdd.RDD.count(RDD.scala:1121) 
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50) 
at sparkTesting.Runner$.main(Runner.scala:133) 
at sparkTesting.Runner.main(Runner.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
+4

Meiner Erfahrung nach ist dies fast immer auf OOM-Ausnahmen zurückzuführen. Versuchen Sie, sich die Protokolldateien auf den einzelnen Executormaschinen anzusehen. –

+1

Ich würde printstacktrace von Ihrem Job und überwachen JVM-Heap-Größe mit einigen Java-Util-Tools: jstat, jstatd, jconsole ... um mehr über die Begrenzung zu erfahren. Falls Sie noch über physischen Speicher verfügen, können Sie die JVM-Speichergröße erhöhen, bevor Sie Ihre App starten! Sie können Ihre Sammlungen basierend auf Ihrer optimierten Heap-Größe anpassen. –

Antwort

4

Die Antwort gefunden.

Die meine Tabelle wurde als 20 GB AVRO-Datei gespeichert. Wenn Executoren versuchten, es zu öffnen. Jeder von ihnen musste 20 GB in den Speicher laden. Gelöst wurde es mit csv anstelle von avro

1

Symptome sind typisch für einen OutOfMemory-Fehler in einem der Executor-Aufgaben. Versuchen Sie, den Arbeitsspeicher beim Ausführen von Jobs zu erweitern. Siehe Parameter --executor-memory von saprk-submit, funken-shell usw. Der Standardwert ist 1G

1

Eine weitere mögliche Ursache für den Fehler "SparkContext is shutdown" ist, dass Sie eine JAR-Datei importieren, nachdem Sie einen anderen Code ausgewertet haben. (Dies kann nur in Spark Notebook passieren.)

Um das Problem zu beheben, verschieben Sie alle Ihre :cp myjar.jar Anweisungen an den Anfang Ihrer Datei.

Verwandte Themen