2017-09-07 5 views
0

Ich habe eine Schleife, die in jeder Iteration Zeilen generiert. Mein Ziel ist es, einen Datenrahmen mit einem gegebenen Schema zu erstellen, der nur diese Zeilen enthält. Ich habe eine Reihe von Schritten im Auge zu folgen, aber ich bin nicht in der Lage ein neues zu einem List[Row] in jeder Schleife IterationSpark - Erstellen eines DataFrame aus einer Liste von in einer Schleife generierten Zeilen

Ich versuche, den folgenden Ansatz hinzuzufügen:

var listOfRows = List[Row]() 

val dfToExtractValues: DataFrame = ??? 

dfToExtractValues.foreach { x => 

    //Not really important how to generate here the variables 
    //So to simplify all the rows will have the same values 

    var col1 = "firstCol" 
    var col2 = "secondCol" 
    var col3 = "thirdCol" 

    val newRow = RowFactory.create(col1,col2,col3) 

    //This step I am not able to do 
    //listOfRows += newRow  -> Just for strings 
    //listOfRows.add(newRow)  -> This add doesnt exist, it is a addString 
    //listOfRows.aggregate(1)(newRow)  -> This is not how aggreage works... 
} 


val rdd = sc.makeRDD[RDD](listOfRows) 

val dfWithNewRows = sqlContext.createDataFrame(rdd, myOriginalDF.schema) 

Kann mir jemand sagen, Was mache ich falsch, oder was könnte ich in meinem Ansatz ändern, um aus den von mir generierten Zeilen einen Datenrahmen zu generieren?

Vielleicht gibt es eine bessere Möglichkeit zum Sammeln der Zeilen anstelle von List [Row]. Aber dann muss ich diese andere Art von Sammlung in einen Datenrahmen umwandeln.

Antwort

1

Kann mir jemand sagen, was ich falsch

Closures tun:

Zunächst einmal ist es sieht aus wie Sie über Understanding Closures im Programmierhandbuch übersprungen. Jeder Versuch, mit dem Abschluss übergebene Variablen zu ändern, ist zwecklos. Alles was Sie tun können, ist eine Kopie zu modifizieren und Änderungen werden nicht global reflektiert.

Variable nicht Gegenstand wandelbar machen:

Nach

var listOfRows = List[Row]() 

eine Variable erzeugt. Assigned List ist so unveränderlich wie es war. Wenn es nicht in die Spark-Kontext war, konnte man einen neuen List erstellen und zuweisen:

listOfRows = newRow :: listOfRows 

Bitte beachte, dass wir perpend nicht anhängen - Sie wollen nicht auf die Liste in einer Schleife anzuhängen.

Variablen mit unveränderbaren Objekten sind nützlich, wenn Sie Daten gemeinsam nutzen möchten (dies ist beispielsweise in Akka üblich), haben aber nicht viele Anwendungen in Spark.

Halten Sie die Dinge verteilt:

Schließlich nie nur Daten für den Fahrer holen es wieder zu verteilen. Sie sollten auch unnötige Konvertierungen zwischen RDDs und DataFrames vermeiden. Am besten ist es DataFrame Betreiber den ganzen Weg zu verwenden:

dfToExtractValues.select(...) 

aber wenn Sie benötigen etwas komplexere map:

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

dfToExtractValues.map(x => ...)(RowEncoder(schema)) 
Verwandte Themen