2016-05-03 10 views
7

Die Spark documentation zeigt, wie Sie einen DataFrame aus einer RDD erstellen und dabei Scala-Fallklassen zum Ableiten eines Schemas verwenden. Ich versuche, dieses Konzept mit sqlContext.createDataFrame(RDD, CaseClass) zu reproduzieren, aber mein DataFrame endet leer. Hier ist mein Scala Code:Wie konvertiert man eine fallklassenbasierte RDD in einen DataFrame?

// sc is the SparkContext, while sqlContext is the SQLContext. 

// Define the case class and raw data 
case class Dog(name: String) 
val data = Array(
    Dog("Rex"), 
    Dog("Fido") 
) 

// Create an RDD from the raw data 
val dogRDD = sc.parallelize(data) 

// Print the RDD for debugging (this works, shows 2 dogs) 
dogRDD.collect().foreach(println) 

// Create a DataFrame from the RDD 
val dogDF = sqlContext.createDataFrame(dogRDD, classOf[Dog]) 

// Print the DataFrame for debugging (this fails, shows 0 dogs) 
dogDF.show() 

Der Ausgang Ich sah mich ist:

Dog(Rex) 
Dog(Fido) 
++ 
|| 
++ 
|| 
|| 
++ 

Was bin ich?

Danke!

Antwort

12

Alles, was Sie brauchen, ist nur

val dogDF = sqlContext.createDataFrame(dogRDD) 

Zweiter Parameter Teil der Java-API und erwartet Sie Klasse folgt Java Beans Konvention (Getter/Setter) ist. Ihre Fallklasse folgt dieser Konvention nicht, sodass keine Eigenschaft erkannt wird, die zu leerem DataFrame ohne Spalten führt.

+1

Das funktionierte. Ich musste auch die Definition der Fallklasse außerhalb meiner Hauptfunktion verschieben, um 'Fehler zu vermeiden: Kein TypTag für Hund verfügbar '. Vielen Dank! – sparkour

+0

Ich sehe, sehr interessant, so dass der zweite Parameter nur immer beim Aufruf von der Java-API benötigt wird, wird scala nur automatisch die Felder des Typs erkennen, der in Spalten konvertiert werden soll? – qwwqwwq

5

Sie können eine DataFrame direkt von einem Seq von Fallklasseninstanzen erstellen toDF wie folgt verwendet:

Modus
val dogDf = Seq(Dog("Rex"), Dog("Fido")).toDF 
0

Fall Class-Ansatz wird nicht funktionieren in Cluster. Es wird ClassNotFoundException zu der Fallklasse geben, die Sie definierten.

es Wandle RDD[Row] und das Schema Ihrer RDD mit StructField definieren und dann createDataFrame wie

val rdd = data.map { attrs => Row(attrs(0),attrs(1)) } 

val rddStruct = new StructType(Array(StructField("id", StringType, nullable = true),StructField("pos", StringType, nullable = true))) 

sqlContext.createDataFrame(rdd,rddStruct) 

toDF() nicht funktionieren entweder

Verwandte Themen