Ihre letzte dataframe
Spalte hängt von id
Spalte, so dass Sie die eindeutige ID s in einem separaten array
speichern müssen.
import org.apache.spark.sql.functions._
val distinctIds = df.select(collect_list("id")).rdd.first().get(0).asInstanceOf[mutable.WrappedArray[String]].distinct
Der nächste Schritt ist zu filter
jeder der distinctIds
und join
sie
val first = distinctIds.head
var finalDF = df.filter($"id" === first).withColumnRenamed("id", first).withColumnRenamed("Amount", first+"_Amt")
for(str <- distinctIds.tail){
var tempDF = df.filter($"id" === str).withColumnRenamed("id", str).withColumnRenamed("Amount", str+"_Amt")
finalDF = finalDF.join(tempDF, Seq("field1", "field2"), "left")
}
finalDF.show(false)
Sie sollten Ihre gewünschte Ausgabe als
+------+------+---+-------+---+-------+---+-------+
|field1|field2|002|002_Amt|003|003_Amt|005|005_Amt|
+------+------+---+-------+---+-------+---+-------+
|A |B |002|10.0 |003|12.0 |005|15.0 |
|C |B |002|20.0 |003|22.0 |005|25.0 |
+------+------+---+-------+---+-------+---+-------+
Var
haben nie für scala empfohlen. So können Sie eine rekursive Funktion erstellen die obige Logik wie unten
def getFinalDF(first: Boolean, array: List[String], df: DataFrame, tdf: DataFrame) : DataFrame = array match {
case head :: tail => {
if(first) {
getFinalDF(false, tail, df, df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head + "_Amt"))
}
else{
val tempDF = df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head+"_Amt")
getFinalDF(false, tail, df, tdf.join(tempDF, Seq("field1", "field2"), "left"))
}
}
case Nil => tdf
}
und rufen Sie die rekursive Funktion als
getFinalDF(true, distinctIds.toList, df, df).show(false)
Sie sollten die gleiche Leistung zu tun haben.