Ich möchte eine Aktion für einen Treiber für jedes Element von RDD ohne Aufruf .collect()
ausführen. Die erste Idee ist RDD.toLocalIterator()
zu verwenden:RDD.toLocalIterator eifrige Bewertung
val config = new SparkConf().setMaster("local[10]").setAppName("xxx")
val sc: SparkContext = new SparkContext(config)
val ints: RDD[Int] = sc.parallelize(1 to 50)
val doubled = ints.map(i => {
Thread.sleep(200)
println(s"map $i" + Thread.currentThread())
i * 2
})
doubled.toLocalIterator.foreach(i => {
println(s"got $i" + Thread.currentThread())
})
Aber in diesem Fall die Berechnung der nächsten Partition beginnt erst, nachdem die vorherige Partition raubend. Die gesamte Berechnung benötigt also zu viel Zeit. erfand ich den folgenden Hack:
doubled.cache()
//force rdd to be materialized
println(doubled.count())
//traverse cached rdd
doubled.toLocalIterator.foreach(i => {
println(s"got $i" + Thread.currentThread())
})
Gibt es eine bessere Lösung?
'foreach' läuft auf Arbeiter, ich brauche eine Funktion auf einem Treiber – simpadjo
ausführen, wenn Sie die Funktion auf Treiber ausführen, denke ich, dass Sie nicht profitieren können die Parallelverarbeitung von Spark –