du eine Kombination aus groupBy/Aggregatfunktionen und Fensterfunktionen erreichen könnten.
Lassen Sie uns dies nach Ansicht der ordersDf sein:
+-------+--------------+-----------+
|orderId| orderDetails|destination|
+-------+--------------+-----------+
| 1|[11,abc,item1]| loc1|
| 2|[12,abc,item2]| loc1|
| 3|[13,abc,item1]| loc1|
| 4|[14,abc,item1]| loc2|
| 5|[15,abc,item2]| loc2|
| 6|[11,abc,item2]| loc2|
| 7|[11,abc,item2]| loc2|
+-------+--------------+-----------+
Zuerst Gruppe die Daten nach Ziel und Gegenstand und die Häufigkeit der einzelnen Elemente zu zählen.
val dfWithCount = ordersDf
.groupBy("destination","orderDetails.item")
.agg(count("orderDetails.item").alias("itemCount"))
Die aggregierten Datenrahmen sieht dann wie folgt
+-----------+-----+---------+
|destination| item|itemCount|
+-----------+-----+---------+
| loc1|item2| 1|
| loc2|item1| 1|
| loc2|item2| 3|
| loc1|item1| 2|
+-----------+-----+---------+
Da wir die am häufigsten Artikel pro Ort, um herauszufinden möchten, lassen Sie uns Partition nach Ziel und die maximale Aggregation über die Säule ItemCount gelten.
val maxWindowSpec = Window.partitionBy("destination")
val maxColumn = max($"itemCount").over(maxWindowSpec)
val dfWithMax = dfWithCount.withColumn("maxItemCount",maxColumn)
Der resultierende Datenrahmen hat sowohl die itemCounts und maxCount des Artikels pro Ziel
+-----------+-----+---------+------------+
|destination| item|itemCount|maxItemCount|
+-----------+-----+---------+------------+
| loc1|item2| 1| 2|
| loc1|item1| 2| 2|
| loc2|item1| 1| 3|
| loc2|item2| 3| 3|
+-----------+-----+---------+------------+
Schließlich Reihen wir herausfiltern, wo die ItemCount für eine bestimmte (Ziel, item) Kombination ist nicht die max Elementanzahl für dieses Ziel.
val result = dfWithMax
.filter("maxItemCount - itemCount == 0")
.drop("maxItemCount","itemCount")
result.show()
+-----------+-----+
|destination| item|
+-----------+-----+
| loc1|item1|
| loc2|item2|
+-----------+-----+