2017-01-12 8 views
0

ich einen Datenrahmen mit 6 Spalten wie diese:Wie Spark Datenrahmen, um verschachtelten Datenrahmen konvertieren

df.printSchema 
root 
|-- d1: string (nullable = true) 
|-- d2: string (nullable = true) 
|-- d3: string (nullable = true) 
|-- m1: string (nullable = true) 
|-- m2: string (nullable = true) 
|-- m3: string (nullable = true) 

Aus irgendwelchen Gründen, würde Ich mag es konvertieren diese zu mögen:

root 
|-- d1: string (nullable = true) 
|-- d2: string (nullable = true) 
|-- d3: string (nullable = true) 
|-- metric: nested 
    |-- m1: string (nullable = true) 
    |-- m2: string (nullable = true) 
    |-- m3: string (nullable = true) 

Ich verbrachte Stunden, aber ich kann es nicht herausfinden. Was ich bisher gemacht habe, ist unter

case class Metric(m1: String, m2: String, m3: String) 
case class Dimension(d1: String, d2: String, d3: String, metric: Metric) 

scala> df.map(row => Dimension(row.getAs[String]("d1"), 
    | row.getAs[String]("d2"), 
    | row.getAs[String]("d3"), 
    | Metric(row.getAs[String]("m1"), 
    |  row.getAs[String]("m2"), 
    |  row.getAs[String]("m3")))) 
res48: org.apache.spark.rdd.RDD[Dimension] = MapPartitionsRDD[32] at map at <console>:46 

scala> df.map(row => Dimension(row.getAs[String]("d1"), 
    | row.getAs[String]("d2"), 
    | row.getAs[String]("d3"), 
    | Metric(row.getAs[String]("m1"), 
    |  row.getAs[String]("m2"), 
    |  row.getAs[String]("m3")))).collect().foreach(println) 
WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 220, hostname): java.lang.ClassNotFoundException: $line55.$read$$iwC$$iwC$Dimension 

scala> df.map(row => Dimension(row.getAs[String]("d1"), 
    | row.getAs[String]("d2"), 
    | row.getAs[String]("d3"), 
    | Metric(row.getAs[String]("m1"), 
    |  row.getAs[String]("m2"), 
    |  row.getAs[String]("m3")))).toDF 
res50: org.apache.spark.sql.DataFrame = [d1: string, d2: string, d3: string, metric: struct<m1:string,m2:string,m3:string>] 

scala> df.map(row => Dimension(row.getAs[String]("d1"), 
    | row.getAs[String]("d2"), 
    | row.getAs[String]("d3"), 
    | Metric(row.getAs[String]("m1"), 
    |  row.getAs[String]("m2"), 
    |  row.getAs[String]("m3")))).toDF.select("d1").show() 
ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionStart(1,show at <console>:51,org.apache.spark.sql.DataFrame.show(DataFrame.scala:319) 

Bitte, hilf mir. Vielen Dank.

Antwort

2

Erforderliche Importe:

// SQLContext in Spark 1.x 
val spark: SparkSession = ??? 

import org.apache.spark.sql.functions.struct 
import spark.implicits._ 

import sqlContext.implicits._ // Spark 1.x 

Einfach wählen:

df.select($"d1", $"d2", $"d3", struct($"m1", $"m2", $"m3").alias("metrics")) 

von (Spark-2.x) gefolgt:

.as[Dimension] 

, wenn Sie eine statisch Dataset[Dimension] statt DataFrame wollen.

Verwandte Themen