2017-04-14 2 views
0

Ich versuche, die Top-N-Elemente in einem Dataset zu erhalten.Spark Window Funktion top N Artikel Leistungsprobleme

Zunächst habe ich das getan.

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val resultDf =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER").where($"ROWNUMBER" <= 2) 

Aber ich die Leistung Kosten der Operation "wo ($" ROWNUMBER "< = 10)"

So entschied ich folgende

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val test =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER") 

implicit val encoder = RowEncoder(test.schema) 

var temp =test.mapPartitions(_.take(2)) 

jedoch zu tun, um vermeiden möchten, Meine Tests scheinen zu zeigen, dass dies nicht die korrekte Ausgabe liefert.

Irgendwelche Gedanken warum. Würde die Take-Funktion des aus dem Fenster-Dataset erhaltenen Iterators nicht die ersten n Elemente im Iterator erhalten?

Antwort

0

Partitionen der Dataset haben keine eins-zu-eins-Entsprechung mit PARTITION BY Klausel. All die Magie in OVER (PARTITION BY ...) passiert auf der viel niedrigeren Ebene und eine einzige physische Partition wird mehrere IDs verarbeiten.

Auch Sie sparen nicht wirklich die Arbeit. Um korrekt zu generieren, muss Spark alle Daten mischen, sortieren und scannen. Sie benötigen Mechanismen auf niedrigerer Ebene, um vollständiges Shuffle und Sortieren zu vermeiden (zum Beispiel Aggregator mit binärem Heap).