2015-10-19 9 views
13

Ich benutze Spark SQL (ich erwähne, dass es in Spark ist, falls das die SQL-Syntax betrifft - ich bin nicht vertraut genug, um sicher zu sein) und ich habe eine Tabelle, die ich neu strukturieren möchte, aber ich bleibe stecken und versuche, mehrere Spalten gleichzeitig zu transponieren.Explode (Transponieren?) Mehrere Spalten in Spark SQL Tabelle

Grundsätzlich ich Daten haben, die wie folgt aussieht:

userId someString  varA  varB 
    1  "example1" [0,2,5] [1,2,9] 
    2  "example2" [1,20,5] [9,null,6] 

und ich möchte beide varA und varB gleichzeitig explodieren (die Länge wird immer konsistent sein) -, so dass die endgültige Ausgabe sieht wie folgt aus:

userId someString  varA  varB 
    1  "example1"  0   1 
    1  "example1"  2   2 
    1  "example1"  5   9 
    2  "example2"  1   9 
    2  "example2"  20  null 
    2  "example2"  5   6 

aber ich kann nur scheinen eine einzige Explosion (var) Erklärung zu bekommen in einem Befehl zu arbeiten, und wenn ich an die Kette versuchen, sie (dh eine temporäre Tabelle erstellen, nachdem der erste Befehl explodieren) dann bekomme ich natürlich ein riesige Anzahl von doppelten, unnötigen Zeilen.

Vielen Dank!

Antwort

21

Was Sie wollen, ist ohne benutzerdefinierte UDF nicht möglich. In Scala kann man so etwas tun:

import org.apache.spark.sql.functions.{udf, explode} 

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
    $"userId", $"someString", 
    $"vars._1".alias("varA"), $"vars._2".alias("varB")).show 

// +------+----------+----+----+ 
// |userId|someString|varA|varB| 
// +------+----------+----+----+ 
// |  1| example1| 0| 1| 
// |  1| example1| 2| 2| 
// |  1| example1| 5| 9| 
// |  2| example2| 1| 9| 
// |  2| example2| 20|null| 
// |  2| example2| 5| 6| 
// +------+----------+----+----+ 

Mit rohen SQL:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1", 
     "varA": [0, 2, 5], "varB": [1, 2, 9]}""", 
    """{"userId": 2, "someString": "example2", 
     "varA": [1, 20, 5], "varB": [9, null, 6]}""" 
)) 

val df = sqlContext.read.json(data) 

df.printSchema 
// root 
// |-- someString: string (nullable = true) 
// |-- userId: long (nullable = true) 
// |-- varA: array (nullable = true) 
// | |-- element: long (containsNull = true) 
// |-- varB: array (nullable = true) 
// | |-- element: long (containsNull = true) 

Jetzt können wir zip UDF definieren

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 
df.registerTempTable("df") 

sqlContext.sql(
    """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""") 
+0

Kann dies auf drei Säulen aufgetragen werden, die von Typenfolge? –

+0

@AmitKumar Ja, warum nicht? Sie müssen Unterschrift und Körper anpassen, aber es ist nicht schwer. – zero323

+0

Ich frage mich, ob in der neueren Dataset-API könnten Sie einfach Map und Zip die Arrays zusammen, ohne die UDF zu erstellen und ob es schneller/Maßstab/von der Katalysator-Ausführung-Engine optimiert werden würde. Ich werde es versuchen, wenn an einer Konsole. – Davos