2017-02-05 7 views
1

Ich habe einige Spark-Code, der eine Reihe von RDDs erstellt. Ganz am Ende rufe ich randomSplit an, um es in 3 Sätze zu teilen, und dann schreibe ich jeden auf die Festplatte. So ist die erste Stufe ist:Warum Spark nicht parallelisieren randomSplit

  1. Erhalten Sie einige Daten
  2. Haben einige Transformationen
  3. Cache das Ergebnis
  4. Split, dass über randomSplit
  5. schreiben alle Splits auf die Festplatte

Da Schritt (4) teilt die Dinge in 3 Sätze, es gibt 3 verschiedene Spark Phasen hier. Gegen Ende der 1. Stufe laufen beginnen wir 1 Aufgaben aus der Bühne, sondern haben Zieher zur Verfügung:

enter image description here

An diesem Punkt werden die Datensätze für mehrere Partitionen bereits berechnet worden. Wie ich es verstehe, randomSplit läuft auf einer Partition von Partition Basis; Mit anderen Worten, es erfordert kein Shuffle oder ein Collect - es wählt einfach nach dem Zufallsprinzip Zeilen auf einer Partitionsbasis aus. Wenn das stimmt, gibt es keinen Grund, dass einige der Tasks von Stufe 2 nicht mit den verfügbaren Executoren ausgeführt werden konnten - die Partitionen für ihre RDDs wurden berechnet und zwischengespeichert. Warum startet Spark einige der Aufgaben der Stufe 2 nicht, um die verfügbaren Ressourcen zu nutzen?

Hinweis: Eindeutig eine "sie könnte, aber sie hat nicht" beantwortet hier ist absolut gültig. Ich denke, was ich wirklich frage ist, gibt es einen technischen Grund, an den ich nicht gedacht habe, macht dies unmöglich (oder sehr schwer) oder ist das nur ein Versehen in der Umsetzung?

Hier ist eine vereinfachte Version des Codes (in Kotlin):

fun run(sc: JavaSparkContext, opts: Options) { 
    val allData = fetchABunchOfData() 

    val allDataRdd = sc.parallelize(allData) 

    val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) } 

    // Convert each ResponseData to a JSON String 
    val jsonStrings = taggedAndTokenized.map { 
     val mapper = AnxJsonUtils.getMapper() 
     mapper.writeValueAsString(it) 
    } 

    // the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the 
    // entire document set twice. 
    jsonStrings.cache() 
    val trainValidTest = 
      jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed) 
    trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME) 
    trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME) 
    trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME) 
} 
+0

Um zu klären, fragen Sie sich, warum die zweite 'saveAsTextFile' nicht startet, bevor die erste fertig ist? –

Antwort

1

Aus einer Reihe von Gründen saveAsTextFile ist ein blockierender Aufruf. Dies bedeutet, dass der Spark-Master den zweiten Speicherbefehl erst erhält, wenn der erste abgeschlossen ist.

Das Gesagte, was Sie tun können, wenn Sie diese verfügbaren Ressourcen nutzen möchten, ist saveAsTextFile in drei separaten Threads aufrufen und auf ihre Zukunft warten. Sobald ein Worker seine Partitionen bei der ersten Aufgabe beendet hat, kann er bei der zweiten beginnen.

Verwandte Themen