Das Problem passierte, wenn ich versuche, meine gecachten Ergebnisse in einer Liste zu halten und versuchen, neue Datenrahmen von allen Daten aus der letzten Liste in jedem zu berechnen Iteration. Aber selbst wenn ich einen leeren Datenrahmen benutze und jedes Mal ein leeres Ergebnis erhalte, wird die Funktion nach ca. 8 ~ 12 Runden plötzlich sehr langsam.Spark Datarefame wird plötzlich sehr langsam, wenn ich die alten zwischengespeicherten Daten iterativ zu viel Zeit wiederverwenden
Hier ist mein Code
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
das Benchmark-Ergebnis 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms
Ich glaube nicht, GC oder Block-Räumung das Problem in meinem Fall, da ich bereits einen leeren Datenrahmen verwenden, aber ich don Weiß ich nicht, was die Ursache ist? Verkenne ich die Bedeutung von Cache oder etwas anderes?
Dank!
Nach ImDarrenG Lösung zu lesen, änderte ich meinen Code, die folgenden sein:
spark.sparkContext.setCheckpointDir("/tmp")
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
resultDf.checkpoint()
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
Aber es immer noch sehr langsam nach einigen Iterationen.
Danke! Du gibst mir die richtige Richtung. Allerdings setze ich auch mein Checkpoint-Verzeichnis mit '' 'spark.sparkContext.setCheckpointDir ("/tmp ");' '' und füge resultDf.checkpoint() direkt nach '' 'val resultDf = lastDfList.foldLeft (...) {...}. Cache '' '. Das Problem ist immer noch da, es wird nach etwa 8 Iterationen unerträglich langsam werden. Ich kann diese Checkpoint-Daten in meinem Verzeichnis/tmp sehen, aber sie schienen nicht zu helfen. –
Schauen Sie sich die Ausgabe von 'toDebugString' an, was Sie tun, erstellt eine sehr komplexe DAG.Verlangsamung wird angesichts der logischen Komplexität Ihres Codes erwartet. – ImDarrenG
Ich weiß das Problem ist, dass meine DAG nach Zeiten der Iterationen zu komplex ist, aber das API-Dokument sagt, dass der Checkpoint verwendet werden kann, um den logischen Plan des Dataset abzuschneiden, also denke ich, dass es meiner Situation helfen kann. Allerdings ist es nicht :(, ich schaue mir die Informationen von debugString an und erkläre und sehe, dass die DAG nach jeder Iteration immer größer wird. –