Ich führe im Cluster-Modus Funke-Jobs auf Garn aus. Der Job erhält die Nachrichten von kafka direct stream. Ich verwende Broadcast-Variablen und Checkpointing alle 30 Sekunden. Wenn ich den Job zum ersten Mal beginne, läuft es einwandfrei ohne Probleme. Wenn ich den Job zu töten, und starten Sie es unter Ausnahme in Testamentsvollstrecker wirft auf eine Nachricht von kafka Empfang:Fehler beim Senden von "broadcast_1_piece0" von "broadcast_1" im Spark-Streaming-Job
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:177)
at net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Hat jemand Ahnung, wie diese Fehler zu beheben?
Spark-Version: 1.5.0
CDH 5.5.1
Ich habe überprüft, der Job war nicht tot. Ich führe drei Funken Job und erstellt Prüfpunkt Verzeichnisse/tmp/spark/,/tmp/Funken,/tmp/spark/. Wenn ich die Checkpoint-Verzeichnisse lösche und dann den Job starte, läuft er ohne Probleme. Höchstwahrscheinlich ist es nicht möglich, die Broadcast-Variablen aus Checkpoint-Daten zu initialisieren. –
dev
nicht sicher, ich verstehe. Es scheint, dass das Problem ist, dass, wenn Sie den Job stoppen, es nicht getötet wird und der Prozess läuft weiter. Ein Workaround wäre, nur ein Skript zu erstellen, das den Prozess abbricht und den Job stoppt. Fehle ich etwas? –
Entschuldigung für Tippfehler, in meinem früheren Kommentar wollte ich erwähnen, dass der Job tot war, das hatte ich verifiziert. – dev