2016-04-04 3 views
0

Ich führe im Cluster-Modus Funke-Jobs auf Garn aus. Der Job erhält die Nachrichten von kafka direct stream. Ich verwende Broadcast-Variablen und Checkpointing alle 30 Sekunden. Wenn ich den Job zum ersten Mal beginne, läuft es einwandfrei ohne Probleme. Wenn ich den Job zu töten, und starten Sie es unter Ausnahme in Testamentsvollstrecker wirft auf eine Nachricht von kafka Empfang:Fehler beim Senden von "broadcast_1_piece0" von "broadcast_1" im Spark-Streaming-Job

java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:177) 
    at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:1) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Hat jemand Ahnung, wie diese Fehler zu beheben?

Spark-Version: 1.5.0

CDH 5.5.1

Antwort

0

Wenn Probleme stoßen, wo nur der erste Lauf funktioniert, ist es immer in Fragen führte die Checkpoint-Daten revolvierenden. Darüber hinaus erfolgt die Verwendung von Checkpoints nur dann, wenn etwas zu prüfen ist, was die erste Nachricht von kafka ist. Ich schlage vor, Sie überprüfen, ob der Job tatsächlich tot ist, das heißt, vielleicht läuft der Prozess noch auf dem Rechner, der ihn ausgeführt hat. versuchen, eine einfache ps -fe laufen und sehen, ob etwas noch läuft. Wenn es zwei Prozesse gibt, die versuchen, denselben Checkpoint-Ordner zu verwenden, wird es immer fehlschlagen. hoffe das hilft

+0

Ich habe überprüft, der Job war nicht tot. Ich führe drei Funken Job und erstellt Prüfpunkt Verzeichnisse/tmp/spark/,/tmp/Funken ,/tmp/spark/. Wenn ich die Checkpoint-Verzeichnisse lösche und dann den Job starte, läuft er ohne Probleme. Höchstwahrscheinlich ist es nicht möglich, die Broadcast-Variablen aus Checkpoint-Daten zu initialisieren. – dev

+0

nicht sicher, ich verstehe. Es scheint, dass das Problem ist, dass, wenn Sie den Job stoppen, es nicht getötet wird und der Prozess läuft weiter. Ein Workaround wäre, nur ein Skript zu erstellen, das den Prozess abbricht und den Job stoppt. Fehle ich etwas? –

+0

Entschuldigung für Tippfehler, in meinem früheren Kommentar wollte ich erwähnen, dass der Job tot war, das hatte ich verifiziert. – dev

Verwandte Themen