2016-04-02 6 views
3

Ich habe einen Datenrahmen (Funke):Filtering Zeilen basierend auf Spaltenwerte in Funkendatenrahmen scala

id value 
3  0 
3  1 
3  0 
4  1 
4  0 
4  0 

ich einen neuen Datenrahmen erstellen möchten:

3 0 
3 1 
4 1 

benötigen alle Zeilen zu entfernen, nachdem 1 (Wert) für jede ID. Ich habe versucht mit Fensterfunktionen in Spark-Datumsrahmen (Scala). Aber konnte keine Lösung finden. Es sieht so aus, als ob ich in eine falsche Richtung gehe.

Suche nach einer Lösung in Scala.Thanks

Ausgabe mit monotonically_increasing_id

scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value") 
data: org.apache.spark.sql.DataFrame = [id: int, value: int] 

scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx") 
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint] 

scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show 
+---+-----+ 
| id|value| 
+---+-----+ 
| 3| 0| 
| 3| 1| 
| 4| 1| 
+---+-----+ 

Die Lösung wird nicht funktionieren, wenn wir eine sortierte Transformation in dem ursprünglichen Datenrahmen tun. Die monotonically_increasing_id() wird auf der Grundlage von ursprünglichen DF eher diese sortierte DF generiert. Ich habe diese Anforderung zuvor verpasst.

Alle Vorschläge sind willkommen.

+0

Und was hast du versuchen, so weit? – eliasah

+0

@eliasah Ich versuchte einige Experimente basierend auf der Antwort hier http://stackoverflow.com/questions/32148208/how-do-i-compare-multiple-rows-of-a-table-using-spark-sql-data- Rahmen-udf. aber bisher kein Erfolg – John

+0

Ist Ihr DF sortiert? –

Antwort

4

Eine Möglichkeit ist monotonically_increasing_id() und ein von ihm selbst kommen:

val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value") 
data.show 
+---+-----+ 
| id|value| 
+---+-----+ 
| 3| 0| 
| 3| 1| 
| 3| 0| 
| 4| 1| 
| 4| 0| 
| 4| 0| 
+---+-----+ 

Jetzt erzeugen wir eine Spalte idx mit einem Long Erhöhung genannt:

val dataWithIndex = data.withColumn("idx", monotonically_increasing_id()) 
// dataWithIndex.cache() 

wir die min(idx) für jeden id erhalten Jetzt wo value = 1:

val minIdx = dataWithIndex 
       .filter($"value" === 1) 
       .groupBy($"id") 
       .agg(min($"idx")) 
       .toDF("r_id", "min_idx") 

Jetzt verbinden wir die min(idx) wieder auf den ursprünglichen DataFrame:

dataWithIndex.join(
    minIdx, 
    ($"r_id" === $"id") && ($"idx" <= $"min_idx") 
).select($"id", $"value").show 
+---+-----+ 
| id|value| 
+---+-----+ 
| 3| 0| 
| 3| 1| 
| 4| 1| 
+---+-----+ 

Hinweis:monotonically_increasing_id() seinen Wert auf der Partition der Zeile basierend erzeugt. Dieser Wert kann sich jedes Mal ändern, wenn dataWithIndex erneut ausgewertet wird. In meinem Code oben, wegen der faulen Bewertung, ist es nur, wenn ich das letzte show aufrufen, dass monotonically_increasing_id() ausgewertet wird.

Wenn Sie den Wert erzwingen wollen gleich zu bleiben, zum Beispiel, so dass Sie show verwenden können Sie den obigen Schritt-für-Schritt Kommentar- diese Zeile über zu bewerten:

// dataWithIndex.cache() 
+0

Vielen Dank @David Griffin – John

+0

Ich bekomme ein seltsames Verhalten. Ich habe die Ausgabe in der Frage selbst aktualisiert. – John

+0

Ja, schau nicht zu tief in die Spalte, die von 'monotonically_increasing_id()' erzeugt wird - du kannst bei jedem Blick unterschiedliche Werte erhalten - die Zahlen, die du siehst, basieren auf dem Partitionierungsschema. Führe einfach den Code aus, schaue nicht auf die Zwischenwerte. Es klappt. –

0

Hallo Ich fand die Lösung Verwenden von Fenster und Self-Join.

val data = Seq((3,0,2),(3,1,3),(3,0,1),(4,1,6),(4,0,5),(4,0,4),(1,0,7),(1,1,8),(1,0,9),(2,1,10),(2,0,11),(2,0,12)).toDF("id", "value","sorted") 

data.show 

scala> data.show 
+---+-----+------+ 
| id|value|sorted| 
+---+-----+------+ 
| 3| 0|  2| 
| 3| 1|  3| 
| 3| 0|  1| 
| 4| 1|  6| 
| 4| 0|  5| 
| 4| 0|  4| 
| 1| 0|  7| 
| 1| 1|  8| 
| 1| 0|  9| 
| 2| 1| 10| 
| 2| 0| 11| 
| 2| 0| 12| 
+---+-----+------+ 




val sort_df=data.sort($"sorted") 

scala> sort_df.show 
+---+-----+------+ 
| id|value|sorted| 
+---+-----+------+ 
| 3| 0|  1| 
| 3| 0|  2| 
| 3| 1|  3| 
| 4| 0|  4| 
| 4| 0|  5| 
| 4| 1|  6| 
| 1| 0|  7| 
| 1| 1|  8| 
| 1| 0|  9| 
| 2| 1| 10| 
| 2| 0| 11| 
| 2| 0| 12| 
+---+-----+------+ 



var window=Window.partitionBy("id").orderBy("$sorted") 

val sort_idx=sort_df.select($"*",rowNumber.over(window).as("count_index")) 

val minIdx=sort_idx.filter($"value"===1).groupBy("id").agg(min("count_index")).toDF("idx","min_idx") 

val result_id=sort_idx.join(minIdx,($"id"===$"idx") &&($"count_index" <= $"min_idx")) 

result_id.show 

+---+-----+------+-----------+---+-------+ 
| id|value|sorted|count_index|idx|min_idx| 
+---+-----+------+-----------+---+-------+ 
| 1| 0|  7|   1| 1|  2| 
| 1| 1|  8|   2| 1|  2| 
| 2| 1| 10|   1| 2|  1| 
| 3| 0|  1|   1| 3|  3| 
| 3| 0|  2|   2| 3|  3| 
| 3| 1|  3|   3| 3|  3| 
| 4| 0|  4|   1| 4|  3| 
| 4| 0|  5|   2| 4|  3| 
| 4| 1|  6|   3| 4|  3| 
+---+-----+------+-----------+---+-------+ 

Noch auf der Suche für eine optimierte solutions.Thanks

Verwandte Themen