2016-05-03 9 views
7

Ich habe mit der Konvertierung von RDDs zu DataFrames und wieder zurück gespielt. Zuerst hatte ich eine RDD vom Typ (Int, Int) namens DataPair. Dann habe ich ein Objekt mit Datenrahmen unter Verwendung von Spaltenüberschriften:So konvertieren Sie eine RDD [Row] zurück zu DataFrame

val dataFrame = dataPair.toDF(header(0), header(1)) 

Dann habe ich es von einem Datenrahmen zurück zu einem RDD umgewandelt werden:

val testRDD = dataFrame.rdd 

die eine RDD vom Typ org.apache.spark zurückgibt. sql.Row (nicht (Int, Int)). Dann würde Ich mag es konvertieren zurück zu einem RDD .toDF verwenden, aber ich erhalte eine Fehlermeldung:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 

Ich habe versucht, ein Schema des Datentypen definieren (Int, Int) für testRDD, aber ich Art bekommen Mismatch Ausnahmen:

error: type mismatch; 
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 
required: org.apache.spark.rdd.RDD[Data] 
    val testRDD: RDD[Data] = dataFrame.rdd 
            ^

ich bereits

importiert haben
import sqlContext.implicits._ 

Antwort

13

Um einen Datenrahmen von einer RDD der Zeilen zu erstellen, in der Regel haben Sie zwei Möglichkeiten:

1) Sie können verwenden, die von importiert werden können. Doch dieser Ansatz funktioniert nur für die folgenden Arten von RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(Quelle: Scaladoc des SQLContext.implicits Objekt)

Die las t-Signatur bedeutet tatsächlich, dass es für eine RDD von Tupeln oder eine RDD von Fallklassen arbeiten kann (weil Tupel und Fallklassen Subklassen von scala.Product sind).

Also, um diesen Ansatz für eine RDD[Row] zu verwenden, müssen Sie es auf eine RDD[T <: scala.Product] zuordnen. Dies kann durch Abbilden jede Zeile zu einer benutzerdefinierten Fallklasse oder zu einem Tupel durchgeführt werden, wie in dem folgenden Code-Schnipsel:

val df = rdd.map({ 
    case Row(val1: String, ..., valN: Long) => (val1, ..., valN) 
}).toDF("col1_name", ..., "colN_name") 

oder

case class MyClass(val1: String, ..., valN: Long = 0L) 
val df = rdd.map({ 
    case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN) 
}).toDF("col1_name", ..., "colN_name") 

Der größte Nachteil dieses Ansatzes (meiner Meinung nach) ist, dass Sie das Schema des resultierenden DataFrames in der Map-Funktion spaltenweise explizit festlegen müssen. Vielleicht kann dies programmatisch geschehen, wenn Sie das Schema nicht im Voraus kennen, aber die Dinge können dort etwas unordentlich werden. Also, alternativ gibt es eine weitere Option:


2) Sie können createDataFrame(rowRDD: RDD[Row], schema: StructType) verwenden, die im SQLContext Objekt verfügbar ist.Beispiel:

val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema) 

Beachten Sie, dass keine Schema-Spalte explizit festgelegt werden muss. Wir verwenden das alte DF-Schema der Klasse StructType und können einfach erweitert werden. Dieser Ansatz ist jedoch manchmal nicht möglich und kann in einigen Fällen weniger effizient sein als der erste.

Ich hoffe, es ist klarer als zuvor. Prost.

+0

Ich fand es heraus, ich musste auf das Datenschema mit dem folgenden zuordnen: 'val df = testRDD.map {Fall Row (n1: Int, n2: Int) => Daten (n1, n2)}. ToDF() ' – TheElysian

+0

Schön, es ist in der Tat eine Option. Die Lösung mit createDataFrame ist jedoch generischer und ermöglicht die Konvertierung, selbst wenn Sie nicht wissen, wie viele Felder der ursprüngliche Datenrahmen hat. –

+0

Ich habe versucht, es zu verwenden, aber ich habe immer Fehler beim Überlauf der CreateDataFrame-Methode erhalten. Danke trotzdem. – TheElysian

Verwandte Themen