Spark Lösung für „shared data“ wird mit Broadcast - wo Sie die Daten laden einmal im Treiber Anwendung und Spark-serialisiert und sendet an jeden der Testamentsvollstrecker (einmal). Wenn eine Aufgabe diese Daten verwendet, stellt Spark sicher, dass sie vorhanden ist, bevor die Aufgabe ausgeführt wird. Zum Beispiel:
object MySparkTransformation {
def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = {
val mySharedData: Map[String, Int] = loadDataOnce()
val broadcast = sc.broadcast(mySharedData)
rdd.map(r => broadcast.value(r))
}
}
Alternativ, wenn Sie das Lesen der Daten in den Treiber Speicher vermeiden wollen und es über den Testamentsvollstrecker senden, können Sie lazy
Werte in einer Scala object
verwenden, um einen Wert zu erzeugen, die einmal bevölkert wird pro JVM, die in Sparks Fall einmal pro Executor ist. Zum Beispiel:
// must be an object, otherwise will be serialized and sent from driver
object MySharedResource {
lazy val mySharedData: Map[String, Int] = loadDataOnce()
}
// If you use mySharedData in a Spark transformation,
// the "local" copy in each executor will be used:
object MySparkTransformation {
def transform(rdd: RDD[String]): RDD[Int] = {
// Spark won't include MySharedResource.mySharedData in the
// serialized task sent from driver, since it's "static"
rdd.map(r => MySharedResource.mySharedData(r))
}
}
In der Praxis werden Sie eine Kopie mySharedData
in jedem Testamentsvollstrecker haben.
das ist alles wahr, aber es ist unmöglich, wenn "SharedData" nicht serialisierbar ist, und nicht zu effizient, wenn es serialisierbar aber groß ist.Die Verwendung von "SharedData", die in einer Treiberanwendung direkt in Spark-Transformationen erstellt wurden, würde bedeuten, dass es serialisiert und an den Executor ** pro Task ** gesendet würde. –
@TzachZohar guter Punkt über SharedData an den Executor pro Aufgabe gesendet. Ja, die Verwendung einer Broadcast-Variable für SharedData würde dazu beitragen, dies zu vermeiden. Die Anforderung zur Serialisierung gilt jedoch auch für beide Variablen in Closure- und Broadcast-Variablen, nicht wahr? –
Ja, Serialisierbarkeitsanforderung gilt auch für Broadcast; Aber nicht für die "statische" Initialisierungsoption, die ich auch erwähnt habe, was (wenn ich es richtig lese) das ist, was das OP anstrebt. –