2016-04-07 7 views
4

Wir verwenden schmelzen und dcast, um Daten aus Breit-> Lang und Lang-> Breitformat zu konvertieren. Weitere Informationen finden Sie unter http://seananderson.ca/2013/10/19/reshape.html.Unterstützt Funken unterstützt schmelzen und dcast

Entweder Scala oder SparkR ist in Ordnung.

Ich bin durch diese blog und scala functions und R API gegangen. Ich sehe keine Funktionen, die ähnliche Arbeit leistet.

Gibt es eine äquivalente Funktion in Spark? Wenn nicht, gibt es einen anderen Weg, es in Spark zu tun?

+0

Scheint nicht so. Wenn Sie Ihre Daten in den Speicher einpassen können, verwenden Sie 'as.data.frame()', um den Spark DataFrame in einen nativen data.frame zu konvertieren, umzuformen und in Spark zurückzuschreiben. – Thomas

+0

Weil es keine gibt. Sie müssen es selbst schreiben. – eliasah

Antwort

10

Reshaping Data with Pivot in Spark bietet Unterstützung für die Umformung mit pivot. Ich habe verstanden, melt ist in etwa die Rückseite des Pivot auch unpivot genannt. Ich bin relativ neu zu Spark. Mit meinem Wissen habe ich versucht, den Schmelzbetrieb zu implementieren.

def melt(df: DataFrame, columns: List[String]): DataFrame ={ 

    val restOfTheColumns = df.columns.filterNot(columns.contains(_)) 
    val baseDF = df.select(columns.head, columns.tail: _*) 
    val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true))) 
    var newdf = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure) 

    for(variableCol <- restOfTheColumns){ 
     val colValues = df.select(variableCol).map(r=> r(0).toString) 
     val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString))) 
     var colDF =sqlContext.createDataFrame(colRdd, newStructure) 
     newdf =newdf.unionAll(colDF) 
    } 
    newdf 
    } 

Es macht die Arbeit. Aber ich bin mir nicht sicher über die Effizienz.

+-----+---+---+----------+------+ 
| name|sex|age| street|weight| 
+-----+---+---+----------+------+ 
|Alice| f| 34| somewhere| 70| 
| Bob| m| 63| nowhere| -70| 
|Alice| f|612|nextstreet| 23| 
| Bob| m|612|  moon|  8| 
+-----+---+---+----------+------+ 

Kann

melt(df, List("name", "sex")) 

Das Ergebnis ist, wie unten als

verwendet werden:

+-----+---+--------+----------+ 
| name|sex|variable|  value| 
+-----+---+--------+----------+ 
|Alice| f|  age|  34| 
| Bob| m|  age|  63| 
|Alice| f|  age|  612| 
| Bob| m|  age|  612| 
|Alice| f| street| somewhere| 
| Bob| m| street| nowhere| 
|Alice| f| street|nextstreet| 
| Bob| m| street|  moon| 
|Alice| f| weight|  70| 
| Bob| m| weight|  -70| 
|Alice| f| weight|  23| 
| Bob| m| weight|   8| 
+-----+---+--------+----------+ 

Ich hoffe, dass es nützlich ist, und schätzen Sie Ihre Kommentare, wenn Raum für Verbesserungen gibt.

0

Hier ist ein spark.ml.Transformer, die gerade Dataset Manipulationen (keine RDD Sachen) verwendet

case class Melt(meltColumns: String*) extends Transformer{ 

    override def transform(in: Dataset[_]): DataFrame = { 
    val nonMeltColumns = in.columns.filterNot{ meltColumns.contains } 
    val newDS = in 
     .select(nonMeltColumns.head,meltColumns:_*) 
     .withColumn("variable", functions.lit(nonMeltColumns.head)) 
     .withColumnRenamed(nonMeltColumns.head,"value") 

    nonMeltColumns.tail 
     .foldLeft(newDS){ case (acc,col) => 
     in 
      .select(col,meltColumns:_*) 
      .withColumn("variable", functions.lit(col)) 
      .withColumnRenamed(col,"value") 
      .union(acc) 
     } 
     .select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*) 
    } 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    @DeveloperApi 
    override def transformSchema(schema: StructType): StructType = ??? 

    override val uid: String = Identifiable.randomUID("Melt") 
} 

Hier ist ein Test, der es

"spark" should "melt a dataset" in { 
    import spark.implicits._ 
    val schema = StructType(
     List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++ 
     Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList) 

    val ds = Range(1,11) 
     .map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)} 
     .|>{ rows => spark.sparkContext.parallelize(rows) } 
     .|>{ rdd => spark.createDataFrame(rdd,schema) } 

    val newDF = ds.transform{ df => 
     Melt("Melt1","Melt2").transform(df) } 

    assert(newDF.count() === 70) 
    } 

verwendet |.> Dem Betreiber scalaZ Rohr ist

0

Funken DataFrame hat explode Methode, die R melt Funktionalität bietet. Beispiel, das in Spark 1.6.1 funktioniert:

// input df has columns (anyDim, n1, n2) 
case class MNV(measureName: String, measureValue: Integer); 
val dfExploded = df.explode(col("n1"), col("n2")) { 
    case Row(n1: Int, n2: Int) => 
    Array(MNV("n1", n1), MNV("n2", n2)) 
} 
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue)