2017-10-03 4 views
0

Also im Grunde möchte ich mehrere Aufgaben auf dem gleichen Knoten/Executor zum Lesen von Daten aus einem gemeinsamen Speicher ausführen. Dazu brauche ich eine Initialisierungsfunktion, die die Daten vor dem Start der Tasks in den Speicher lädt. Wenn Spark einen Hook für einen Executor-Start bereitstellt, könnte ich diesen Initialisierungscode in diese Rückruffunktion einfügen, wobei die Aufgaben nur ausgeführt werden, nachdem dieser Startvorgang abgeschlossen ist.Gibt es einen Haken für Executor Startup in Spark?

Also, meine Frage ist, bietet Spark solche Haken? Wenn nicht, mit welcher anderen Methode kann ich dasselbe erreichen?

Antwort

0

Sie müssen nicht mehrere Instanzen der App ausführen, um mehrere Tasks ausführen zu können (z. B. eine Anwendungsinstanz, eine Spark-Task). Das gleiche SparkSession-Objekt kann von mehreren Threads verwendet werden, um Spark-Aufgaben parallel zu übergeben.

So kann es wie folgt funktionieren:

  • Die Anwendung startet und betreibt eine Initialisierungsfunktion gemeinsam genutzte Daten in den Speicher zu laden. Sprich, in ein SharedData-Klassenobjekt.
  • SparkSession wird
  • erstellt
  • A Threadpool erstellt wird, wobei jeder Thread Zugriff auf (SparkSession, Shared) Objekte
  • Jeder Thread erstellt Funken Aufgabe Verwendung gemeinsamer SparkSession und Shared Objekte hat.
  • Je nach Anwendungsfall die Anwendung tut dann eine der folgenden:
    • wartet für alle Aufgaben zu erledigen und dann Session
    • wartet in einer Schleife schließt Funken für neue Anforderungen und schafft neue Spark ankommen Aufgaben wie erforderlich mit Threads aus dem Thread-Pool.

SparkContext (sparkSession.sparkContext) ist nützlich, wenn Sie pro Thread Dinge tun wollen wie eine Aufgabenbeschreibung Zuweisen einer Gruppe, um die Aufgabe setJobGroup so damit verbundenen Aufgaben können gleichzeitig abgebrochen mit setJobDescription oder Zuweisung verwenden cancelJobGroup. Sie können auch die Priorität für die Aufgaben optimieren, die den gleichen Pool verwenden, siehe https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application für Details.

+1

das ist alles wahr, aber es ist unmöglich, wenn "SharedData" nicht serialisierbar ist, und nicht zu effizient, wenn es serialisierbar aber groß ist.Die Verwendung von "SharedData", die in einer Treiberanwendung direkt in Spark-Transformationen erstellt wurden, würde bedeuten, dass es serialisiert und an den Executor ** pro Task ** gesendet würde. –

+0

@TzachZohar guter Punkt über SharedData an den Executor pro Aufgabe gesendet. Ja, die Verwendung einer Broadcast-Variable für SharedData würde dazu beitragen, dies zu vermeiden. Die Anforderung zur Serialisierung gilt jedoch auch für beide Variablen in Closure- und Broadcast-Variablen, nicht wahr? –

+1

Ja, Serialisierbarkeitsanforderung gilt auch für Broadcast; Aber nicht für die "statische" Initialisierungsoption, die ich auch erwähnt habe, was (wenn ich es richtig lese) das ist, was das OP anstrebt. –

0

Spark Lösung für „shared data“ wird mit Broadcast - wo Sie die Daten laden einmal im Treiber Anwendung und Spark-serialisiert und sendet an jeden der Testamentsvollstrecker (einmal). Wenn eine Aufgabe diese Daten verwendet, stellt Spark sicher, dass sie vorhanden ist, bevor die Aufgabe ausgeführt wird. Zum Beispiel:

object MySparkTransformation { 

    def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = { 
    val mySharedData: Map[String, Int] = loadDataOnce() 
    val broadcast = sc.broadcast(mySharedData) 
    rdd.map(r => broadcast.value(r)) 
    } 
} 

Alternativ, wenn Sie das Lesen der Daten in den Treiber Speicher vermeiden wollen und es über den Testamentsvollstrecker senden, können Sie lazy Werte in einer Scala object verwenden, um einen Wert zu erzeugen, die einmal bevölkert wird pro JVM, die in Sparks Fall einmal pro Executor ist. Zum Beispiel:

// must be an object, otherwise will be serialized and sent from driver 
object MySharedResource { 
    lazy val mySharedData: Map[String, Int] = loadDataOnce() 
} 

// If you use mySharedData in a Spark transformation, 
// the "local" copy in each executor will be used: 
object MySparkTransformation { 
    def transform(rdd: RDD[String]): RDD[Int] = { 
    // Spark won't include MySharedResource.mySharedData in the 
    // serialized task sent from driver, since it's "static" 
    rdd.map(r => MySharedResource.mySharedData(r)) 
    } 
} 

In der Praxis werden Sie eine Kopie mySharedData in jedem Testamentsvollstrecker haben.

+0

Ja, mir ist die Broadcast-Funktion bereits bekannt, aber der Grund, warum ich sie nicht verwenden möchte, ist, dass meine Aufgaben eine ausführbare Datei ausführen würden, der Code, der durch ein C-Programm kompiliert wird. Ich möchte diese Daten direkt aus einer HDFS-Datei laden und die Daten in einen gemeinsamen Speicher legen, damit sie von diesen Tasks verwendet werden können. Natürlich müsste ich auch den C-Code ein wenig modifizieren. Das Lazy-Val-Ding sieht für diesen Zweck passender aus. Also werde ich das überprüfen. – pythonic

Verwandte Themen