2017-12-10 3 views
-1

I wie unten ein Datenrahmen haben:innerhalb einer Gruppe Operate durch und füllen Sie zusätzliche Spalten

+------+------+---+------+ 
|field1|field2|id |Amount| 
+------+------+---+------+ 
|A  |B  |002|10.0 | 
|A  |B  |003|12.0 | 
|A  |B  |005|15.0 | 
|C  |B  |002|20.0 | 
|C  |B  |003|22.0 | 
|C  |B  |005|25.0 | 
+------+------+---+------+ 

ich es umwandeln müssen:

+------+------+---+-------+---+-------+---+-------+ 
|field1|field2|002|002_Amt|003|003_Amt|005|005_Amt| 
+------+------+---+-------+---+-------+---+-------+ 
|A  |B  |002|10.0 |003|12.0 |005|15.0 | 
|C  |B  |002|20.0 |003|22.0 |005|25.0 | 
+------+------+---+-------+---+-------+---+-------+ 

Bitte geben!

Antwort

1

Ihre letzte dataframe Spalte hängt von id Spalte, so dass Sie die eindeutige ID s in einem separaten array speichern müssen.

import org.apache.spark.sql.functions._ 
val distinctIds = df.select(collect_list("id")).rdd.first().get(0).asInstanceOf[mutable.WrappedArray[String]].distinct 

Der nächste Schritt ist zu filter jeder der distinctIds und join sie

val first = distinctIds.head 
var finalDF = df.filter($"id" === first).withColumnRenamed("id", first).withColumnRenamed("Amount", first+"_Amt") 
for(str <- distinctIds.tail){ 
    var tempDF = df.filter($"id" === str).withColumnRenamed("id", str).withColumnRenamed("Amount", str+"_Amt") 
    finalDF = finalDF.join(tempDF, Seq("field1", "field2"), "left") 
} 
finalDF.show(false) 

Sie sollten Ihre gewünschte Ausgabe als

+------+------+---+-------+---+-------+---+-------+ 
|field1|field2|002|002_Amt|003|003_Amt|005|005_Amt| 
+------+------+---+-------+---+-------+---+-------+ 
|A  |B  |002|10.0 |003|12.0 |005|15.0 | 
|C  |B  |002|20.0 |003|22.0 |005|25.0 | 
+------+------+---+-------+---+-------+---+-------+ 

Var haben nie für scala empfohlen. So können Sie eine rekursive Funktion erstellen die obige Logik wie unten

def getFinalDF(first: Boolean, array: List[String], df: DataFrame, tdf: DataFrame) : DataFrame = array match { 
    case head :: tail => { 
    if(first) { 
     getFinalDF(false, tail, df, df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head + "_Amt")) 
    } 
    else{ 
     val tempDF = df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head+"_Amt") 
     getFinalDF(false, tail, df, tdf.join(tempDF, Seq("field1", "field2"), "left")) 
    } 
    } 
    case Nil => tdf 
} 

und rufen Sie die rekursive Funktion als

getFinalDF(true, distinctIds.toList, df, df).show(false) 

Sie sollten die gleiche Leistung zu tun haben.

Verwandte Themen