2016-04-08 7 views
0

Ich versuche eine bestimmte Art der Filterung mit Spark durchzuführen. Ich habe einen Datenrahmen, der wie folgt aussieht:Datenfilterung in Spark

ID Property#1 Property#2 Property#3 
----------------------------------------- 
01 a   b   c 
01 a   X   c 
02 d   e   f 
03 i   j   k 
03 i   j   k 

Ich erwarte, dass die Eigenschaften für eine bestimmte ID identisch sind. In dem obigen Beispiel würde Ich mag die folgende, um herauszufiltern:

ID Property#2 
--------------- 
01 b 
01 X 

Beachten Sie, dass es okay ist, für IDs in den Datenrahmen wiederholt werden, solange die Eigenschaften gleich sind (zB ID ‚03‘ in die erste Tabelle). Der Code muss so effizient wie möglich sein, da ich plane, ihn auf Datensätze mit> 10k Zeilen anzuwenden. Ich habe versucht, die verschiedenen Zeilen mit der Funktion distinct in DataFrame API extrahieren, gruppierte sie in der ID-Spalte mit groupBy und aggregierte die Ergebnisse mit countDistinct Funktion, aber leider konnte ich nicht eine funktionierende Version des Codes. Auch die Art, wie ich es implementiert habe, scheint ziemlich langsam zu sein. Ich habe mich gefragt, ob irgendjemand Hinweise geben kann, wie man dieses Problem angeht.

Danke!

+0

Es scheint, dass Sie 'müssen groupBy' von' id' zuerst, dann 'filter' von' value .size> 1', dann 'flatMap' Werte im ursprünglichen ungruppierten Format. – Aivean

+0

'groupBy' gibt ein Objekt vom Typ' GroupedData' zurück. Ich glaube nicht, dass Sie 'Filter' auf' GroupedData' anwenden können, wenn ich nicht etwas verpasse? – bbtus

+0

Auch scheint es die meiste Zeit in meinem Code in 'groupBy' verbracht (obwohl ich es auf den DataFrame und nicht die zugrunde liegende RDD anwenden). Gibt es eine Möglichkeit, die Gruppierung zu vermeiden? – bbtus

Antwort

0

Sie können zum Beispiel aggregieren und beitreten. Zuerst werden Sie eine Lookup-Tabelle erstellen haben:

val df = Seq(
    ("01", "a", "b", "c"), ("01", "a", "X", "c"), 
    ("02", "d", "e", "f"), ("03", "i", "j", "k"), 
    ("03", "i", "j", "k") 
).toDF("id", "p1", "p2", "p3") 

val lookup = df.distinct.groupBy($"id").count 

dann die Datensätze filtern:

df.join(broadcast(lookup), Seq("id")) 

df.join(broadcast(lookup), Seq("id")).where($"count" !== 1).show 
// +---+---+---+---+-----+ 
// | id| p1| p2| p3|count| 
// +---+---+---+---+-----+ 
// | 01| a| b| c| 2| 
// | 01| a| X| c| 2| 
// +---+---+---+---+-----+