2017-10-02 1 views
1

ich mit einem Datenrahmen von diesem json erstellt arbeite:Funke - nach einem withColumn („NEWCOL“, collect_list (...)) wählen Reihen mit mehr als einem Element

{"id" : "1201", "name" : "satish", "age" : "25"}, 
{"id" : "1202", "name" : "krishna", "age" : "28"}, 
{"id" : "1203", "name" : "amith", "age" : "39"}, 
{"id" : "1204", "name" : "javed", "age" : "23"}, 
{"id" : "1205", "name" : "mendy", "age" : "25"}, 
{"id" : "1206", "name" : "rob", "age" : "24"}, 
{"id" : "1207", "name" : "prudvi", "age" : "23"} 

Zunächst wird der Datenrahmen sieht aus wie dies:

+---+----+-------+ 
|age| id| name| 
+---+----+-------+ 
| 25|1201| satish| 
| 28|1202|krishna| 
| 39|1203| amith| 
| 23|1204| javed| 
| 25|1205| mendy| 
| 24|1206| rob| 
| 23|1207| prudvi| 
+---+----+-------+ 

Was muss ich Gruppe sind alle Studenten mit dem gleichen Alter, bestellen sie auf ihrer ID abhängig. Dies ist, wie ich das so weit:

* Hinweis: Ich bin mir ziemlich sicher, dass die sind effizienter als das Hinzufügen einer neuen Spalte mit withColumn("newCol", ..), dann verwenden Sie eine select("newCol"), aber ich weiß nicht, wie zu lösen es besser

val conf = new SparkConf().setAppName("SimpleApp").set("spark.driver.allowMultipleContexts", "true").setMaster("local[*]") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sqlContext.read.json("students.json") 

    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions._ 

    val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("List") 

Der Ausgang ich immer bin, ist dies:

[WrappedArray([25,1201,satish], [25,1205,mendy])] 
[WrappedArray([24,1206,rob])] 
[WrappedArray([23,1204,javed])] 
[WrappedArray([23,1204,javed], [23,1207,prudvi])] 
[WrappedArray([28,1202,krishna])] 
[WrappedArray([39,1203,amith])] 

Nun Wie kann ich die Zeilen filtern, die mehr als ein Element bekommen haben? Das heißt, ich will, dass mein letzten Datenrahmen sein:

[WrappedArray([25,1201,satish], [25,1205,mendy])] 
[WrappedArray([23,1204,javed], [23,1207,prudvi])] 

Mein bester Ansatz so weit ist:

val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))) 

val filterd = mergedDF.withColumn("count", count("age").over(Window.partitionBy("age"))).filter($"count" > 1).select("newCol") 

Aber ich muß etwas fehlen, weil das Ergebnis nicht das erwartete ist:

[WrappedArray([23,1204,javed], [23,1207,prudvi])] 
[WrappedArray([25,1201,satish])] 
[WrappedArray([25,1201,satish], [25,1205,mendy])] 

Antwort

2

Sie size() verwenden können, um Ihre Daten zu filtern:

import org.apache.spark.sql.functions.{col,size} 

mergedDF.filter(size(col("newCol"))>1).show(false) 

+---+----+------+-----------------------------------+ 
|age|id |name |newCol        | 
+---+----+------+-----------------------------------+ 
|23 |1207|prudvi|[[23,1204,javed], [23,1207,prudvi]]| 
|25 |1205|mendy |[[25,1201,satish], [25,1205,mendy]]| 
+---+----+------+-----------------------------------+ 
Verwandte Themen