2017-06-13 6 views
0

ich einen Datenrahmen haben unten enthält:Verschwenkung Datenrahmen - Spark-SQL

TradeId|Source 
ABC|"USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602" 

Ich möchte diese Daten schwenken, so dass es in unten dreht

TradeId|CCY|PV 
ABC|USD|333.123 
ABC|USD|-789.444 
ABC|GBP|1234.567 

Die Zahl der CCY | PV | Datum Triplets in der Spalte "Source" ist nicht festgelegt. Ich könnte es in ArrayList tun, aber das erfordert, die Daten in JVM zu laden und den ganzen Punkt von Spark zu besiegen.

Können sagen, meine Datenrahmen wie unten aussieht:

DataFrame tradesSnap = this.loadTradesSnap(reportRequest); 
String tempTable = getTempTableName(); 
tradesSnap.registerTempTable(tempTable); 
tradesSnap = tradesSnap.sqlContext().sql("SELECT TradeId, Source FROM " + tempTable); 

Antwort

1

Wenn Sie databricks pivot lesen, heißt es " A pivot is an aggregation where one (or more in the general case) of the grouping columns has its distinct values transposed into individual columns." Und das ist nicht das, was Sie sich wünschen Ich denke,

Ich würde vorschlagen, Sie withColumn und functions zu verwenden, um die endgültige Ausgabe bekommen Sie wünschen. Sie können so tun, unter Berücksichtigung folgender dataframe ist, was Sie

+-------+----------------------------------------------------------------+ 
|TradeId|Source               | 
+-------+----------------------------------------------------------------+ 
|ABC |USD,333.123,20170605|USD,-789.444,20170605|GBP,1234.567,20150602| 
+-------+----------------------------------------------------------------+ 

Sie haben können Sie wie folgt mit explode, split und withColumn die gewünschte Ausgabe

val explodedDF = dataframe.withColumn("Source", explode(split(col("Source"), "\\|"))) 
val finalDF = explodedDF.withColumn("CCY", split($"Source", ",")(0)) 
    .withColumn("PV", split($"Source", ",")(1)) 
    .withColumn("Date", split($"Source", ",")(2)) 
    .drop("Source") 

finalDF.show(false) 

Die endgültige Ausgabe zu erhalten, ist

+-------+---+--------+--------+ 
|TradeId|CCY|PV  |Date | 
+-------+---+--------+--------+ 
|ABC |USD|333.123 |20170605| 
|ABC |USD|-789.444|20170605| 
|ABC |GBP|1234.567|20150602| 
+-------+---+--------+--------+ 

Ich hoffe, dies löst Ihr Problem

+0

Ja, ich habe diese Option auch gefunden. Es ist einfacher zu verstehen und könnte es sogar zu meiner anfänglichen Auswahlabfrage hinzufügen. – Achilles

+0

Großartig das @Archilles zu hören. Und danke für die Annahme und Upvote :) –

2

Anstatt Schwenken, was Sie versuchen, sieht aus wie flatMap mehr zu erreichen.

Um es einfach auszudrücken, indem Sie flatMap auf einem Dataset verwenden, wenden Sie auf jede Zeile eine Funktion an (map), die selbst eine Sequenz von Zeilen erzeugen würde. Jede Reihe von Zeilen wird dann zu einer einzigen Sequenz verkettet (flat).

Das folgende Programm zeigt die Idee:

+-------+---+--------+--------+ 
|TradeId|CCY|  PV| Date| 
+-------+---+--------+--------+ 
| ABC|USD| 333.123|20170605| 
| ABC|USD|-789.444|20170605| 
| ABC|GBP|1234.567|20150602| 
+-------+---+--------+--------+ 

Sie können nun das Ergebnis nehmen und es in eine CSV speichern, wenn Sie:

import org.apache.spark.sql.SparkSession 

case class Input(TradeId: String, Source: String) 

case class Output(TradeId: String, CCY: String, PV: String, Date: String) 

object FlatMapExample { 

    // This function will produce more rows of output for each line of input 
    def splitSource(in: Input): Seq[Output] = 
    in.Source.split("\\|", -1).map { 
     source => 
     println(source) 
     val Array(ccy, pv, date) = source.split(",", -1) 
     Output(in.TradeId, ccy, pv, date) 
    } 

    def main(args: Array[String]): Unit = { 

    // Initialization and loading 
    val spark = SparkSession.builder().master("local").appName("pivoting-example").getOrCreate() 
    import spark.implicits._ 
    val input = spark.read.options(Map("sep" -> "|", "header" -> "true")).csv(args(0)).as[Input] 

    // For each line in the input, split the source and then 
    // concatenate each "sub-sequence" in a single `Dataset` 
    input.flatMap(splitSource).show 
    } 

} 

Ihre Eingabe gegeben, dies ist der Ausgang wäre wollen.