2016-12-19 3 views
0

Wenn ich eine Gruppe der verschiedenen Elemente, die wie folgt aussehenSpark-SQL finden häufigste Element in einer Gruppe

case class Order(orderId: String, orderDetails: OrderDetail, destination: String) 
case class OrderDetail(date: Timestamp, recipient: String, item: String) 

grouped = ordersDF.groupby($"destination") 

Was Im suchen, ist eine Möglichkeit, die häufigste Element von jedem Ziel auf der Suche zu finden Die SQL-Funktionen für die Aggregation, ich sehe nichts für UntergruppeBys auf den Daten. Die Daten in eine RDD umzuwandeln, kann funktionieren, aber mein Verständnis ist, dass es nicht die besten Praktiken sind.

Ich möchte etwas sehen, wie

|Destination | mostCommon | 
---------------------------- 
|XYZ   |item x  | 

Antwort

2

du eine Kombination aus groupBy/Aggregatfunktionen und Fensterfunktionen erreichen könnten.

Lassen Sie uns dies nach Ansicht der ordersDf sein:

+-------+--------------+-----------+ 
|orderId| orderDetails|destination| 
+-------+--------------+-----------+ 
|  1|[11,abc,item1]|  loc1| 
|  2|[12,abc,item2]|  loc1| 
|  3|[13,abc,item1]|  loc1| 
|  4|[14,abc,item1]|  loc2| 
|  5|[15,abc,item2]|  loc2| 
|  6|[11,abc,item2]|  loc2| 
|  7|[11,abc,item2]|  loc2| 
+-------+--------------+-----------+ 

Zuerst Gruppe die Daten nach Ziel und Gegenstand und die Häufigkeit der einzelnen Elemente zu zählen.

val dfWithCount = ordersDf 
.groupBy("destination","orderDetails.item") 
.agg(count("orderDetails.item").alias("itemCount")) 

Die aggregierten Datenrahmen sieht dann wie folgt

+-----------+-----+---------+ 
|destination| item|itemCount| 
+-----------+-----+---------+ 
|  loc1|item2|  1| 
|  loc2|item1|  1| 
|  loc2|item2|  3| 
|  loc1|item1|  2| 
+-----------+-----+---------+ 

Da wir die am häufigsten Artikel pro Ort, um herauszufinden möchten, lassen Sie uns Partition nach Ziel und die maximale Aggregation über die Säule ItemCount gelten.

val maxWindowSpec = Window.partitionBy("destination") 
val maxColumn = max($"itemCount").over(maxWindowSpec) 
val dfWithMax = dfWithCount.withColumn("maxItemCount",maxColumn) 

Der resultierende Datenrahmen hat sowohl die itemCounts und maxCount des Artikels pro Ziel

+-----------+-----+---------+------------+ 
|destination| item|itemCount|maxItemCount| 
+-----------+-----+---------+------------+ 
|  loc1|item2|  1|   2| 
|  loc1|item1|  2|   2| 
|  loc2|item1|  1|   3| 
|  loc2|item2|  3|   3| 
+-----------+-----+---------+------------+ 

Schließlich Reihen wir herausfiltern, wo die ItemCount für eine bestimmte (Ziel, item) Kombination ist nicht die max Elementanzahl für dieses Ziel.

val result = dfWithMax 
.filter("maxItemCount - itemCount == 0") 
.drop("maxItemCount","itemCount") 

result.show() 

+-----------+-----+ 
|destination| item| 
+-----------+-----+ 
|  loc1|item1| 
|  loc2|item2| 
+-----------+-----+ 
Verwandte Themen