2017-05-18 9 views
1

Ich möchte eine Aktion für einen Treiber für jedes Element von RDD ohne Aufruf .collect() ausführen. Die erste Idee ist RDD.toLocalIterator() zu verwenden:RDD.toLocalIterator eifrige Bewertung

val config = new SparkConf().setMaster("local[10]").setAppName("xxx") 
val sc: SparkContext = new SparkContext(config) 
val ints: RDD[Int] = sc.parallelize(1 to 50) 
val doubled = ints.map(i => { 
    Thread.sleep(200) 
    println(s"map $i" + Thread.currentThread()) 
    i * 2 
}) 

doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

Aber in diesem Fall die Berechnung der nächsten Partition beginnt erst, nachdem die vorherige Partition raubend. Die gesamte Berechnung benötigt also zu viel Zeit. erfand ich den folgenden Hack:

doubled.cache() 
//force rdd to be materialized 
println(doubled.count()) 
//traverse cached rdd 
doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

Gibt es eine bessere Lösung?

Antwort

-1

Warum verwenden Sie nicht nur die RDD.foreach Methode Ich denke, das kann das gleiche wie Ihr Beispiel in der Frage geben. Mit diesem können Sie den Vorteil der Parallelverarbeitung Angebot von Funken RDD s

+0

'foreach' läuft auf Arbeiter, ich brauche eine Funktion auf einem Treiber – simpadjo

+0

ausführen, wenn Sie die Funktion auf Treiber ausführen, denke ich, dass Sie nicht profitieren können die Parallelverarbeitung von Spark –