ich mit einem Datenrahmen von diesem json erstellt arbeite:Funke - nach einem withColumn („NEWCOL“, collect_list (...)) wählen Reihen mit mehr als einem Element
{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "39"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}
Zunächst wird der Datenrahmen sieht aus wie dies:
+---+----+-------+
|age| id| name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 39|1203| amith|
| 23|1204| javed|
| 25|1205| mendy|
| 24|1206| rob|
| 23|1207| prudvi|
+---+----+-------+
Was muss ich Gruppe sind alle Studenten mit dem gleichen Alter, bestellen sie auf ihrer ID abhängig. Dies ist, wie ich das so weit:
* Hinweis: Ich bin mir ziemlich sicher, dass die sind effizienter als das Hinzufügen einer neuen Spalte mit withColumn("newCol", ..)
, dann verwenden Sie eine select("newCol")
, aber ich weiß nicht, wie zu lösen es besser
val conf = new SparkConf().setAppName("SimpleApp").set("spark.driver.allowMultipleContexts", "true").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext.read.json("students.json")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("List")
Der Ausgang ich immer bin, ist dies:
[WrappedArray([25,1201,satish], [25,1205,mendy])]
[WrappedArray([24,1206,rob])]
[WrappedArray([23,1204,javed])]
[WrappedArray([23,1204,javed], [23,1207,prudvi])]
[WrappedArray([28,1202,krishna])]
[WrappedArray([39,1203,amith])]
Nun Wie kann ich die Zeilen filtern, die mehr als ein Element bekommen haben? Das heißt, ich will, dass mein letzten Datenrahmen sein:
[WrappedArray([25,1201,satish], [25,1205,mendy])]
[WrappedArray([23,1204,javed], [23,1207,prudvi])]
Mein bester Ansatz so weit ist:
val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id")))
val filterd = mergedDF.withColumn("count", count("age").over(Window.partitionBy("age"))).filter($"count" > 1).select("newCol")
Aber ich muß etwas fehlen, weil das Ergebnis nicht das erwartete ist:
[WrappedArray([23,1204,javed], [23,1207,prudvi])]
[WrappedArray([25,1201,satish])]
[WrappedArray([25,1201,satish], [25,1205,mendy])]