2017-06-01 2 views
0

Das Problem passierte, wenn ich versuche, meine gecachten Ergebnisse in einer Liste zu halten und versuchen, neue Datenrahmen von allen Daten aus der letzten Liste in jedem zu berechnen Iteration. Aber selbst wenn ich einen leeren Datenrahmen benutze und jedes Mal ein leeres Ergebnis erhalte, wird die Funktion nach ca. 8 ~ 12 Runden plötzlich sehr langsam.Spark Datarefame wird plötzlich sehr langsam, wenn ich die alten zwischengespeicherten Daten iterativ zu viel Zeit wiederverwenden

Hier ist mein Code

testLoop(Nil) 
def testLoop(lastDfList:List[DataFrame]){  
    // do some dummy transformation like union and cache the result 
    val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache   

    // always get 0, of course 
    println(resultDf.count) 

    // benchmark action 
    benchmark(resultDf.count)  

    testLoop(resultDf::lastDfList) 
} 

das Benchmark-Ergebnis 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms

Ich glaube nicht, GC oder Block-Räumung das Problem in meinem Fall, da ich bereits einen leeren Datenrahmen verwenden, aber ich don Weiß ich nicht, was die Ursache ist? Verkenne ich die Bedeutung von Cache oder etwas anderes?

Dank!


Nach ImDarrenG Lösung zu lesen, änderte ich meinen Code, die folgenden sein:

spark.sparkContext.setCheckpointDir("/tmp") 

testLoop(Nil) 
def testLoop(lastDfList:List[DataFrame]){  
    // do some dummy transformation like union and cache the result 
    val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache   

    resultDf.checkpoint() 

    // always get 0, of course 
    println(resultDf.count) 

    // benchmark action 
    benchmark(resultDf.count)  

    testLoop(resultDf::lastDfList) 
} 

Aber es immer noch sehr langsam nach einigen Iterationen.

Antwort

1

Hier erstellen Sie eine Liste von DataFrames von resultDf bis Anfang lastDfList Zugabe und passieren, dass auf der nächsten Iteration von testLoop:

testLoop(resultDf::lastDfList) 

So lastDfList mehr jeden Pass bekommt.

Diese Linie schafft eine neue DataFrame durch union ing jedes Mitglied lastDfList:

val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache 

Jedes Mitglied lastDfList ist eine Vereinigung von seinen Vorgängern daher Spark Aufrechterhaltung einer Linie, die bei jedem Durchlauf exponentiell größer wird von testLoop.

Ich erwarte, dass die Zunahme der Zeit durch die Haushaltung der DAG verursacht wird. Durch das Zwischenspeichern der Datenrahmen müssen keine Transformationen mehr durchgeführt werden. Die Lineage muss jedoch weiterhin von spark verwaltet werden.

Zwischengespeicherte Daten oder nein, es sieht so aus, als ob Sie eine wirklich komplexe DAG bauen, indem Sie jedes DataFrame mit allen seinen Vorgängern mit jedem Durchlauf von testLoop verbinden.

Sie könnten checkpoint verwenden, um die Herkunft zu trimmen und eine Überprüfung einzuführen, um eine unendliche Rekursion zu verhindern.

+0

Danke! Du gibst mir die richtige Richtung. Allerdings setze ich auch mein Checkpoint-Verzeichnis mit '' 'spark.sparkContext.setCheckpointDir ("/tmp ");' '' und füge resultDf.checkpoint() direkt nach '' 'val resultDf = lastDfList.foldLeft (...) {...}. Cache '' '. Das Problem ist immer noch da, es wird nach etwa 8 Iterationen unerträglich langsam werden. Ich kann diese Checkpoint-Daten in meinem Verzeichnis/tmp sehen, aber sie schienen nicht zu helfen. –

+0

Schauen Sie sich die Ausgabe von 'toDebugString' an, was Sie tun, erstellt eine sehr komplexe DAG.Verlangsamung wird angesichts der logischen Komplexität Ihres Codes erwartet. – ImDarrenG

+0

Ich weiß das Problem ist, dass meine DAG nach Zeiten der Iterationen zu komplex ist, aber das API-Dokument sagt, dass der Checkpoint verwendet werden kann, um den logischen Plan des Dataset abzuschneiden, also denke ich, dass es meiner Situation helfen kann. Allerdings ist es nicht :(, ich schaue mir die Informationen von debugString an und erkläre und sehe, dass die DAG nach jeder Iteration immer größer wird. –

0

Gemäß API und code gibt checkpoint einen neuen Datensatz zurück, anstatt den ursprünglichen Datensatz zu ändern.

+1

Während dieser Link die Frage beantworten kann, ist es besser, die wesentlichen Teile der Antwort hier aufzunehmen und den Link als Referenz zur Verfügung zu stellen. Nur-Link-Antworten können ungültig werden, wenn sich die verknüpfte Seite ändert. - [Aus Bewertung] (/ review/low-quality-posts/18342577) –