2013-07-22 5 views
10

In der offiziellen Funken Dokumentation, gibt es ein Beispiel für einen Akkumulator, der in einem foreach Anruf verwendet wird, die direkt auf einer RDD ist:Accumulator auf Cluster ausfällt, arbeitet lokal

scala> val accum = sc.accumulator(0) 
accum: spark.Accumulator[Int] = 0 

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
... 
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 

scala> accum.value 
res2: Int = 10 

ich meinen eigenen Akkumulator implementiert:

val myCounter = sc.accumulator(0) 

val myRDD = sc.textFile(inputpath) // :spark.RDD[String] 

myRDD.flatMap(line => foo(line)) // line 69 

def foo(line: String) = { 
    myCounter += 1 // line 82 throwing NullPointerException 
    // compute something on the input 
} 
println(myCounter.value) 

In einer lokalen Umgebung funktioniert das gut. Allerdings, wenn ich diesen Job auf einem Funkenstandalone-Cluster mit mehreren Maschinen laufen, werfen die Arbeiter einen

13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247 
java.lang.NullPointerException 
    at MyClass$.foo(MyClass.scala:82) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) 
    at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.scheduler.ResultTask.run(ResultTask.scala:77) 
    at spark.executor.Executor$TaskRunner.run(Executor.scala:98) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:722) 

an der Leitung, die den Akkumulator myCounter erhöht.

Meine Frage ist: Können Akkumulatoren nur in "Top-Level" anonymen Funktionen verwendet werden, die direkt auf RDDs und nicht in verschachtelten Funktionen angewendet werden? Wenn ja, warum ist mein Anruf lokal erfolgreich und schlägt auf einem Cluster fehl?

bearbeiten: erhöhte Ausführlichkeit der Ausnahme.

+0

Konnten Sie mehr vom traceback des Arbeiters posten? –

+0

Haben Sie 'sc.broadcast (myCounter)' ausprobiert? – Noah

+0

Liefert 'broadcast' keinen schreibgeschützten Wert? Aus den [offiziellen API-Dokumenten] (http://spark-project.org/docs/latest/api/core/index.html#spark.SparkContext): "Senden Sie eine schreibgeschützte Variable an den Cluster und geben Sie ein Broadcast-Objekt zurück zum Lesen in verteilten Funktionen. Die Variable wird nur einmal an jeden Cluster gesendet. " – ptikobj

Antwort

1

Was passiert, wenn Sie definieren die Funktion wie folgt aus:

def foo(line: String, myc: org.apache.spark.Accumulator[Int]) = { 
    myc += 1 
} 

Und es dann so nennen:

foo(line, myCounter) 

?

+0

Dies scheint korrekt zu sein. Sie können den Akkumulator, den Sie erstellt haben, an die Methode übergeben – pulasthi

-1

Wenn Sie "flatMap" verwenden, wird "myCounter" nicht aktualisiert, da "flatMap" eine lazy-Funktion ist. Sie können diesen Code verwenden:

myRDD.foreach(line => foo(line)) 
def foo(line: String) = {myCounter +=1} 
println(myCounter.value) 
2

In meinem Fall auch Akkumulator war null in Schließung, wenn ich verwenden ‚erstreckt App‘ einen Funken Anwendung zu erstellen, wie unten

object AccTest extends App { 


    val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 

    val accum = sc.accumulator(0, "My Accumulator") 
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

    println("count:" + accum.value) 

    sc.stop 
    } 
} 

I erweitert App ersetzt gezeigt mit main (Methode) und es funktionierte in YARN Cluster in HDP 2.4 Objekt AccTest { def main (args: Array [String]): Einheit = {

val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
val sc = new SparkContext(conf) 
sc.setLogLevel("ERROR") 

val accum = sc.accumulator(0, "My Accumulator") 
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

println("count:" + accum.value) 

sc.stop 

}}

Verwandte Themen