2014-10-14 4 views
15

Ich habe Funken an der Dokumentation gesucht und es erwähnt dies:Was ist der richtige Weg, um ein statisches Objekt auf allen Arbeitnehmern haben

Spark API stützt sich stark auf Funktionen im Treiberprogramm vorbei zu auf dem Cluster ausführen. Es gibt zwei empfohlene Methoden:

Anonyme Funktionssyntax, die für kurze Codeabschnitte verwendet werden kann. Statische Methoden in einem globalen Singleton-Objekt. Zum Beispiel können Sie Objekt MyFunctions definieren und dann MyFunctions.func1 passieren, wie folgt:

object MyFunctions { def func1(s: String): String = { ... } } 

myRdd.map(MyFunctions.func1) 

Beachten Sie, dass, während es auch möglich, ist eine Referenz auf eine Methode in einer Klasse-Instanz übergeben (Im Gegensatz zu einem Singleton-Objekt) erfordert dies das Senden des Objekts, das die Klasse zusammen mit der Methode enthält. Betrachten wir zum Beispiel:

class MyClass { 
    def func1(s: String): String = { ... } 
    def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } 
} 

Hier wird, wenn wir eine neue MyClass und rufen doStuff darauf, die Karte im Inneren gibt verweist auf die func1 Methode dieses MyClass Instanz, so dass die gesamte Objekt Bedürfnisse erstellen an den Cluster gesendet werden. Es ähnelt dem Schreiben rdd.map(x => this.func1(x)).

Jetzt ist mein Zweifel, was passiert, wenn Sie Attribute auf dem Singleton-Objekt haben (die statisch äquivalent sein sollen). Dasselbe Beispiel mit einer kleinen Änderung:

object MyClass { 
    val value = 1 
    def func1(s: String): String = { s + value } 
} 

myRdd.map(MyClass.func1) 

So ist die Funktion noch statisch verwiesen, aber wie Funken weit geht geht indem man versucht, alle referenzierten Variablen zu serialisiert? Wird es value serialisiert oder wird es in den Remote-Arbeitern wieder initialisiert?

Darüber hinaus ist dies alles in dem Zusammenhang, dass ich einige schwere Modelle in einem Singleton-Objekt habe und ich möchte die richtige Möglichkeit finden, sie an Arbeiter zu serialisieren, während die Fähigkeit, sie aus dem Singleton überall statt zu referenzieren übergibt sie als Funktionsparameter über einen Call-Stack mit ziemlich tiefer Funktion.

Jede eingehende Informationen über was/wie/wann Spark Daten serialisieren würde geschätzt.

Antwort

13

Dies ist weniger eine Frage über Spark und mehr eine Frage, wie Scala Code generiert. Denken Sie daran, dass eine Scala object eine Java-Klasse voller statischer Methoden ist. Betrachten Sie ein einfaches Beispiel wie folgt:

object foo { 

    val value = 42 

    def func(i: Int): Int = i + value 

    def main(args: Array[String]): Unit = { 
    println(Seq(1, 2, 3).map(func).sum) 
    } 

} 

Das wird in 3 Java-Klassen übersetzt werden; Einer von ihnen wird die Schließung, die ein Parameter für die map Methode ist. Mit javap auf dieser Klasse Ausbeuten etwas wie folgt aus:

public final class foo$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable { 
    public static final long serialVersionUID; 
    public final int apply(int); 
    public int apply$mcII$sp(int); 
    public final java.lang.Object apply(java.lang.Object); 
    public foo$$anonfun$main$1(); 
} 

Hinweis gibt es keine Felder oder nichts. Wenn Sie sich den disassemblierten Bytecode ansehen, rufen Sie nur die Methode func() auf.Wenn es in Spark läuft, ist dies die Instanz, die serialisiert wird; Da es keine Felder gibt, gibt es nicht viel zu serialisieren.

Für Ihre Frage, wie Sie statische Objekte initialisieren, können Sie eine idempotente Initialisierungsfunktion haben, die Sie zu Beginn Ihrer Schließungen aufrufen. Die erste löst die Initialisierung aus, die nachfolgenden Aufrufe sind No-Ops. Die Bereinigung ist jedoch sehr viel schwieriger, da ich mit einer API, die so etwas wie "Führe diesen Code auf allen Executoren aus" nicht kenne.

Ein Ansatz, der nützlich sein kann, wenn Sie Bereinigung benötigen, wird in this blog im Abschnitt "setup() und Bereinigung()" erläutert.

EDIT: nur zur Verdeutlichung, hier ist die Demontage der Methode, die tatsächlich den Anruf macht.

public int apply$mcII$sp(int); 
    Code: 
    0: getstatic  #29; //Field foo$.MODULE$:Lfoo$; 
    3: iload_1 
    4: invokevirtual #32; //Method foo$.func:(I)I 
    7: ireturn 

Sehen Sie, wie es verweist nur das statische Feld die Singleton und ruft die func() Methode zu halten.

+1

Da die Anfangswerte von statischen Feldern in einer Java-Klasse im Jar gespeichert sind, können Worker sie direkt verwenden. Wenn Sie jedoch den Wert eines statischen Felds im Treiber ändern, können Worker nur den Anfangswert anzeigen. Bitte testen Sie es nicht im lokalen Modell, da der lokale Modus denselben Prozess wieder verwendet. – zsxwing

+0

Das ist sehr aufschlussreich, aber ich bin immer noch verwirrt. Singletons in Scala sollen in Ihrem Beispiel eine Instanzvariable erzeugen (http://stackoverflow.com/questions/5721046/singletons-as-synthetic-classes-in-scala). Irgendwelche solche Referenzkräfte Spark ["so dass das ganze Objekt an den Cluster gesendet werden muss"] (https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark). Außerdem habe ich erwähnt, dass [Spark ein paar Extras macht] (https://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark) von dem, was von der Scala erwartet werden kann Compiler: -S –

+0

Singletons in Scala werden immer noch in einer statischen Variable gehalten (in meinem Beispiel wäre das "foo $ .MODULE $"), und statische Variablen werden niemals serialisiert. Spark hat Code, um Schließungen zu bereinigen, aber das gilt nicht für Singletons, da es in diesem Fall keine Serialisierung gibt. – vanza

Verwandte Themen