Ich habe einige Spark-Code, der eine Reihe von RDDs erstellt. Ganz am Ende rufe ich randomSplit
an, um es in 3 Sätze zu teilen, und dann schreibe ich jeden auf die Festplatte. So ist die erste Stufe ist:Warum Spark nicht parallelisieren randomSplit
- Erhalten Sie einige Daten
- Haben einige Transformationen
- Cache das Ergebnis
- Split, dass über
randomSplit
- schreiben alle Splits auf die Festplatte
Da Schritt (4) teilt die Dinge in 3 Sätze, es gibt 3 verschiedene Spark Phasen hier. Gegen Ende der 1. Stufe laufen beginnen wir 1 Aufgaben aus der Bühne, sondern haben Zieher zur Verfügung:
An diesem Punkt werden die Datensätze für mehrere Partitionen bereits berechnet worden. Wie ich es verstehe, randomSplit
läuft auf einer Partition von Partition Basis; Mit anderen Worten, es erfordert kein Shuffle oder ein Collect - es wählt einfach nach dem Zufallsprinzip Zeilen auf einer Partitionsbasis aus. Wenn das stimmt, gibt es keinen Grund, dass einige der Tasks von Stufe 2 nicht mit den verfügbaren Executoren ausgeführt werden konnten - die Partitionen für ihre RDDs wurden berechnet und zwischengespeichert. Warum startet Spark einige der Aufgaben der Stufe 2 nicht, um die verfügbaren Ressourcen zu nutzen?
Hinweis: Eindeutig eine "sie könnte, aber sie hat nicht" beantwortet hier ist absolut gültig. Ich denke, was ich wirklich frage ist, gibt es einen technischen Grund, an den ich nicht gedacht habe, macht dies unmöglich (oder sehr schwer) oder ist das nur ein Versehen in der Umsetzung?
Hier ist eine vereinfachte Version des Codes (in Kotlin):
fun run(sc: JavaSparkContext, opts: Options) {
val allData = fetchABunchOfData()
val allDataRdd = sc.parallelize(allData)
val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) }
// Convert each ResponseData to a JSON String
val jsonStrings = taggedAndTokenized.map {
val mapper = AnxJsonUtils.getMapper()
mapper.writeValueAsString(it)
}
// the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the
// entire document set twice.
jsonStrings.cache()
val trainValidTest =
jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed)
trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME)
trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME)
trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME)
}
Um zu klären, fragen Sie sich, warum die zweite 'saveAsTextFile' nicht startet, bevor die erste fertig ist? –