In der offiziellen Funken Dokumentation, gibt es ein Beispiel für einen Akkumulator, der in einem foreach
Anruf verwendet wird, die direkt auf einer RDD ist:Accumulator auf Cluster ausfällt, arbeitet lokal
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
ich meinen eigenen Akkumulator implementiert:
val myCounter = sc.accumulator(0)
val myRDD = sc.textFile(inputpath) // :spark.RDD[String]
myRDD.flatMap(line => foo(line)) // line 69
def foo(line: String) = {
myCounter += 1 // line 82 throwing NullPointerException
// compute something on the input
}
println(myCounter.value)
In einer lokalen Umgebung funktioniert das gut. Allerdings, wenn ich diesen Job auf einem Funkenstandalone-Cluster mit mehreren Maschinen laufen, werfen die Arbeiter einen
13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
at MyClass$.foo(MyClass.scala:82)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
an der Leitung, die den Akkumulator myCounter
erhöht.
Meine Frage ist: Können Akkumulatoren nur in "Top-Level" anonymen Funktionen verwendet werden, die direkt auf RDDs und nicht in verschachtelten Funktionen angewendet werden? Wenn ja, warum ist mein Anruf lokal erfolgreich und schlägt auf einem Cluster fehl?
bearbeiten: erhöhte Ausführlichkeit der Ausnahme.
Konnten Sie mehr vom traceback des Arbeiters posten? –
Haben Sie 'sc.broadcast (myCounter)' ausprobiert? – Noah
Liefert 'broadcast' keinen schreibgeschützten Wert? Aus den [offiziellen API-Dokumenten] (http://spark-project.org/docs/latest/api/core/index.html#spark.SparkContext): "Senden Sie eine schreibgeschützte Variable an den Cluster und geben Sie ein Broadcast-Objekt zurück zum Lesen in verteilten Funktionen. Die Variable wird nur einmal an jeden Cluster gesendet. " – ptikobj