2016-10-27 3 views
0

Ich habe Datenrahmen, die Zeilen mit identischer ID enthält. Ich muss Union alle Zeilen mit der gleichen ID in einer Zeile (eine json)spark - Union Datenrahmen Zeilen in einer Zeile

Hier ist Beispiel für die Daten:

id first_name last_name 
1 JAMES   SMITH 
2 MARY   BROWN 
2 DAVID   WILLIAMS 
1 ROBERT  DAVIS 

das angeforderte Ergebnis ist:

{ 
    id:1, 
    entities: [{ 
    first_name:JAMES, 
    last_name:SMITH 
    }, { 
    first_name:ROBERT, 
    last_name:DAVIS 
    }] 
} 
{ 
    id:2, 
    entities: [{ 
    first_name:MARY, 
    last_name:BROWN 
    }, { 
    first_name:DAVID, 
    last_name:WILLIAMS 
    }] 
} 

Kann es getan werden?

Grüße, Yaniv

Antwort

1

Sie groupBy und collect_list verwenden können, nachdem Sie "merge", um die entsprechenden Spalten in einer einzigen, verschachtelte Struktur:

val input: DataFrame = Seq(
    (1, "JAMES", "SMITH"), 
    (2, "MARY", "BROWN"), 
    (2, "DAVID", "WILLIAMS"), 
    (1, "ROBERT", "DAVIS") 
).toDF("id", "first_name", "last_name") 

import org.apache.spark.sql.functions._ 
val result = input 
    .withColumn("entity", struct($"first_name", $"last_name")) 
    .groupBy("id").agg(collect_list($"entity")) 

result.show(false) 
// +---+--------------------------------+ 
// |id |entities      | 
// +---+--------------------------------+ 
// |1 |[[JAMES,SMITH], [ROBERT,DAVIS]] | 
// |2 |[[MARY,BROWN], [DAVID,WILLIAMS]]| 
// +---+--------------------------------+ 

result.printSchema() 
// root 
// |-- id: integer (nullable = false) 
// |-- entities: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- first_name: string (nullable = true) 
// | | |-- last_name: string (nullable = true) 
+0

nicht von 'collect_list'and ihre Nutzung bewusst haben, Vielen Dank. – Shankar

+0

Vielen Dank Tzach für Ihre Antwort, aber wenn ich versuche, den Code auszuführen, erhalte ich eine Ausnahme: 'AnalyseException: undefined Funktion collect_list' –

+0

Oh, ich denke, das heißt, Sie sollten' "org.apache.spark" %% "enthalten spark-hive "' in deinen Abhängigkeiten (zusätzlich zu '" org.apache.spark "%%" spark-sql "'), weil die Implementierung dieser Funktion in Sparks Hive-Unterstützung ... –

Verwandte Themen