2017-08-12 2 views
1

ich einen Eingang Datenrahmen des I Summe berechnen, Graf von mehreren Top-K-Werte auslösen

+---------------------------------+ 
|name| values |score |row_number| 
+---------------------------------+ 
|A |1000 |0  |1  | 
|B |947 |0  |2  | 
|C |923 |1  |3  | 
|D |900 |2  |4  | 
|E |850 |3  |5  | 
|F |800 |1  |6  | 
+---------------------------------+ 

Format haben

müssen SUM Summe (Werte) zu erhalten, wenn Score> 0 und row_number < K (i, e) aller Werte, wenn der Wert> 0 für die oberen k Werte im Datenrahmen ist.

Ich bin in der Lage, dies zu erreichen, indem die folgende Abfrage für Top-100-Werte

val top_100_data = df.select(
     count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"), 
     sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"), 
     sum(when(col("row_number") <=100, col(values))).alias("total_sum_100") 
    ) 

jedoch ausgeführt wird, muss ich Daten für Top 100.200.300 ...... 2500 holen. Das würde bedeuten, dass ich diese Abfrage 25 Mal ausführen und schließlich 25 Datenfelder zusammenführen müsste.

Ich bin neu zu funken und immer noch viele Dinge aus. Was wäre der beste Ansatz, um dieses Problem zu lösen?

Danke !!

Antwort

1

Sie können eine Array von Grenzen als

val topFilters = Array(100, 200, 300) // you can add more 

erstellen Dann können Sie Schleife durch die topFilters Array und erstellen Sie die dataframe Sie benötigen. Ich schlage vor, Sie verwenden join anstatt union als join geben Sie separate columns und unions geben Sie separate rows. Sie können die folgenden Schritte aus

Angesichts Ihrer dataframe als

+----+------+-----+----------+ 
|name|values|score|row_number| 
+----+------+-----+----------+ 
|A |1000 |0 |1   | 
|B |947 |0 |2   | 
|C |923 |1 |3   | 
|D |900 |2 |200  | 
|E |850 |3 |150  | 
|F |800 |1 |250  | 
+----+------+-----+----------+ 

Sie mithilfe der topFilters Array wie oben

definiert tun können
import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 
var finalDF : DataFrame = Seq("1").toDF("rowNum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k")) 
    finalDF = finalDF.join(top_100_data, Seq("rowNum")) 
} 
finalDF.show(false) 

Was du letzte dataframe als

geben sollte
+------+-------------+-------------+-------------+ 
|rowNum|total_sum_100|total_sum_200|total_sum_300| 
+------+-------------+-------------+-------------+ 
|1  |923   |1773   |3473   | 
+------+-------------+-------------+-------------+ 

Sie kann dasselbe für Ihre 25 Grenzen tun, die Sie haben.

Wenn Sie beabsichtigen, union zu verwenden, dann ist die Idee ähnlich wie oben.

Ich hoffe, die Antwort hilfreich

Aktualisiert

Wenn Sie Union benötigen, dann können Sie über

var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"), 
    sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"), 
    sum(when(col("row_number") <=k, col("values"))).alias("total_sum")) 
    finalDF = finalDF.union(top_100_data) 
} 
finalDF.filter(col("limit") =!= 0).show(false) 

definiert folgende Logik mit dem gleichen Limit Array anzuwenden, die Sie

geben sollte
+-----+-----+------------+---------+ 
|limit|count|sum_filtered|total_sum| 
+-----+-----+------------+---------+ 
|100 |1 |923   |2870  | 
|200 |3 |2673  |4620  | 
|300 |4 |3473  |5420  | 
+-----+-----+------------+---------+ 
+0

Hallo! Danke für die Antwort, Das war sehr hilfreich !!. Also würde ich für jedes K 3 Spalten benötigen (sum_100_filtered_score, total_sum_100, count_filtered_score_100). Wenn ich dem Datensatz beitrete, erhalte ich für jedes Feld eine Spalte. Deshalb versuche ich Union – Vignesh

+0

dafür zu benutzen. :) statt beitreten können Sie Union verwenden. –

+0

sehe meine aktualisierte Antwort :) Sie können akzeptieren und upvote, wenn es Ihnen wirklich geholfen hat –