2015-05-14 11 views
22

Lassen Sie uns sagen, ich habe eine ziemlich große Datenmenge in der folgenden Form:Entfernen Duplikate von Zeilen basierend auf bestimmten Spalten in einer RDD/Spark-Dataframe

data = sc.parallelize([('Foo',41,'US',3), 
         ('Foo',39,'UK',1), 
         ('Bar',57,'CA',2), 
         ('Bar',72,'CA',2), 
         ('Baz',22,'US',6), 
         ('Baz',36,'US',6)]) 

Was ich möchte, ist doppelte Zeilen auf der Basis tun entfernen Nur Werte der ersten, dritten und vierten Spalte.

völlig doppelte Zeilen zu entfernen ist einfach:

data = data.distinct() 

und entweder Zeile 5 oder Zeile 6 wird

aber entfernt werden, wie nur ich doppelte Zeilen entfernen 1 Spalten basierend auf, 3 und 4 nur? das heißt entfernen entweder einen der folgenden:

('Baz',22,'US',6) 
('Baz',36,'US',6) 

In Python, dies durch die Angabe Spalten mit .drop_duplicates() getan werden könnte. Wie kann ich das Gleiche in Spark/Pyspark erreichen?

Antwort

22

Pyspark hat gehören ein dropDuplicates() Verfahren. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row 
>>> df = sc.parallelize([ \ 
...  Row(name='Alice', age=5, height=80), \ 
...  Row(name='Alice', age=5, height=80), \ 
...  Row(name='Alice', age=10, height=80)]).toDF() 
>>> df.dropDuplicates().show() 
+---+------+-----+ 
|age|height| name| 
+---+------+-----+ 
| 5| 80|Alice| 
| 10| 80|Alice| 
+---+------+-----+ 

>>> df.dropDuplicates(['name', 'height']).show() 
+---+------+-----+ 
|age|height| name| 
+---+------+-----+ 
| 5| 80|Alice| 
+---+------+-----+ 

Vielleicht in einer späteren Version wurde eingeführt als das, was @ Jason (OP) wurde mit?

bearbeiten: ja es wurde in 1 eingeführt.4

+0

Gibt es eine Möglichkeit, die Datensätze zu erfassen, die es gelöscht hat? – user422930

17

Aus Ihrer Frage ist unklar, in welchen Spalten Sie Dubletten ermitteln möchten. Die allgemeine Idee hinter der Lösung besteht darin, basierend auf den Werten der Spalten, die Duplikate identifizieren, einen Schlüssel zu erstellen. Dann können Sie die Operationen reduceByKey oder reduce verwenden, um Duplikate zu eliminieren.

Hier einige Code ist Ihnen den Einstieg:

def get_key(x): 
    return "{0}{1}{2}".format(x[0],x[2],x[3]) 

m = data.map(lambda x: (get_key(x),x)) 

Nun Sie einen Schlüssel-Wert haben RDD, die 1,3 von Spalten verkeilt ist und 4. Der nächste Schritt ein entweder reduceByKey oder wäre groupByKey und filter. Dies würde Dubletten eliminieren.

r = m.reduceByKey(lambda x,y: (x)) 
7

Ich weiß, dass Sie schon die andere Antwort akzeptiert, aber wenn man dies als Datenrahmen tun will, nur groupBy und agg verwenden. Angenommen, Sie sind bereits erstellt eine DF hatte (mit Spalten namens „col1“, „col2“, etc) Sie tun können:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4") 

Beachten Sie, dass in diesem Fall ich die Max von col2 gewählt, aber man konnte avg tun , min, etc.

+1

Meine bisherigen Erfahrungen mit DataFrames sind, dass sie alles eleganter und viel schneller machen. –

10

Stimmen Sie mit David überein. Um hinzuzufügen, es kann nicht der Fall sein, dass wir wollen groupBy alle Spalten außer die Spalte (n) in Aggregatfunktion dh, wenn wir Duplikate rein auf einer Teilmenge von Spalten entfernen und alle Spalten beibehalten möchten im ursprünglichen Datenrahmen. So ist der bessere Weg, dies zu tun könnte dropDuplicates Datenrahmen api in Funken 1.4.0

als Referenz verwenden möchten, finden Sie unter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

+0

Haben wir eine entsprechende Funktion in SparkR? – sag

6

Ich verwendete eingebaute Funktion dropDuplicates(). Scala-Code unten angegeben

val data = sc.parallelize(List(("Foo",41,"US",3), 
("Foo",39,"UK",1), 
("Bar",57,"CA",2), 
("Bar",72,"CA",2), 
("Baz",22,"US",6), 
("Baz",36,"US",6))).toDF("x","y","z","count") 

data.dropDuplicates(Array("x","count")).show() 

Ausgang:

+---+---+---+-----+ 
| x| y| z|count| 
+---+---+---+-----+ 
|Baz| 22| US| 6| 
|Foo| 39| UK| 1| 
|Foo| 41| US| 3| 
|Bar| 57| CA| 2| 
+---+---+---+-----+ 
+0

Die Frage fragt speziell nach pyspark Implementierung, nicht skalieren – vaerek

-1

Das ist mein Df enthalten 4 hier zweimal so wiederholt wird, wird wiederholt Werte entfernen.

scala> df.show 
+-----+ 
|value| 
+-----+ 
| 1| 
| 4| 
| 3| 
| 5| 
| 4| 
| 18| 
+-----+ 

scala> val newdf=df.dropDuplicates 

scala> newdf.show 
+-----+ 
|value| 
+-----+ 
| 1| 
| 3| 
| 5| 
| 4| 
| 18| 
+-----+ 
+0

Ich glaube nicht, dass dies eine richtige Antwort ist. – Alex

+0

Sie können in Spark-Shell einchecken ich habe die richtige Ausgabe freigegeben .. diese Ans ist s verwandt, wie wir wiederholte Werte in der Spalte oder df entfernen können .. –

+0

Können Sie ein Beispiel basierend auf OPs Frage? – Alex

Verwandte Themen