2015-08-06 14 views
6

Ich versuche, einige Analysen auf Sets zu tun. Ich habe einen Beispieldatensatz, der wie folgt aussieht:Spark Datengruppen groupby in Liste

orders.json

{"items":[1,2,3,4,5]} 
{"items":[1,2,5]} 
{"items":[1,3,5]} 
{"items":[3,4,5]} 

Alles, was es ist, ist ein einzelnes Feld, das eine Liste von Zahlen ist, die IDs darstellen.

Hier ist der Spark-Skript, das ich zu laufen versuche:

val sparkConf = new SparkConf() 
    .setMaster("local[*]") 
    .setAppName("Dataframe Test") 

val sc = new SparkContext(sparkConf) 
val sql = new SQLContext(sc) 

val dataframe = sql.read.json("orders.json") 

val expanded = dataframe 
    .explode[::[Long], Long]("items", "item1")(row => row) 
    .explode[::[Long], Long]("items", "item2")(row => row) 

val grouped = expanded 
    .where(expanded("item1") !== expanded("item2")) 
    .groupBy("item1", "item2") 
    .count() 

val recs = grouped 
    .groupBy("item1") 

Erstellen expanded und grouped in Ordnung ist, auf den Punkt gebracht expanded eine Liste aller möglichen Sätze von zwei IDs, wo die beiden IDs waren in das gleiche originale Set. grouped filtert die mit ihnen übereinstimmenden IDs aus, gruppiert dann alle eindeutigen ID-Paare und erstellt für jede eine Anzahl. Das Schema und Datenprobe von grouped sind:

root 
|-- item1: long (nullable = true) 
|-- item2: long (nullable = true) 
|-- count: long (nullable = false) 

[1,2,2] 
[1,3,2] 
[1,4,1] 
[1,5,3] 
[2,1,2] 
[2,3,1] 
[2,4,1] 
[2,5,2] 
... 

Also, meine Frage ist: Wie ich jetzt Gruppe auf dem ersten Punkt in jedem Ergebnis so, dass ich eine Liste von Tupeln habe? Für die Beispieldaten über, würde ich so etwas wie dies erwarten:

[1, [(2, 2), (3, 2), (4, 1), (5, 3)]] 
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]] 

Wie Sie in meinem Skript mit recs sehen kann, dachte ich Sie, indem Sie eine groupBy auf beginnen würde ‚item1‘, die das erste Element ist in jede Reihe. Danach bleibt jedoch dieses GroupedData-Objekt übrig, das sehr begrenzte Aktionen enthält. Wirklich, Sie bleiben nur bei Aggregationen wie sum, avg, etc. Ich möchte nur die Tupel aus jedem Ergebnis auflisten.

Ich könnte RDD-Funktionen an dieser Stelle leicht verwenden, aber das geht von der Verwendung von Dataframes. Gibt es eine Möglichkeit, dies mit den Datenrahmenfunktionen zu tun.

Antwort

5

können Sie bauen, dass mit org.apache.spark.sql.functions (collect_list und struct) verfügbar seit 1,6

val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set")) 


+-----+----------------------------+ 
|item1|set       | 
+-----+----------------------------+ 
|1 |[[5,3], [4,1], [3,2], [2,2]]| 
|2 |[[4,1], [1,2], [5,2], [3,1]]| 
+-----+----------------------------+ 

Sie collect_set auch

bearbeiten können: für Informationen, tuples existieren nicht in Datenrahmen. Die nächstgelegenen Strukturen sind struct, da sie den Fallklassen in der API-API ohne Typ entsprechen.

Verwandte Themen