2016-09-01 3 views
1

ich einen pyspark Datenrahmen haben wie: wo c1, c2, c3, c4, c5, c6 sind die SäulenPySpark Datenrahmen identifizieren eindeutigen Wert auf eine Spalte basierend auf doppelte Werte in anderen Spalten

+----------------------------+ 
    |c1 | c2 | c3 | c4 | c5 | c6 | 
    |----------------------------| 
    | a | x | y | z | g | h |  
    | b | m | f | l | n | o |  
    | c | x | y | z | g | h |  
    | d | m | f | l | n | o |  
    | e | x | y | z | g | i | 
    +----------------------------+ 

Ich möchte c1-Werte für die Zeilen extrahieren, die dieselben c2-, c3-, c4-, c5-Werte aber unterschiedliche c1-Werte haben. Wie, 1., 3. & 5. Zeilen haben die gleichen Werte für c2, c3, c4 & c5, aber andere c1-Wert. Die Ausgabe sollte also a, c & e sein.
(Update) ähnlich, 2. & 4. Zeilen haben die gleichen Werte für c2, c3, c4 & c5, aber andere c1-Wert. So sollte der Ausgang auch enthalten b & d

Wie kann ich ein solches Ergebnis erhalten? Ich habe versucht, groupby anzuwenden, aber ich verstehe nicht, wie man unterschiedliche Werte für c1 erhält.

UPDATE:

Ausgabe sollte

# +-------+ 
# |c1_dups| 
# +-------+ 
# | a,c,e| 
# | b,e| 
# +-------+ 

Mein Ansatz ein Datenrahmen von c1 Werte annehmen:

m = data.groupBy('c2','c3','c4','c5) 

aber ich bin nicht zu verstehen, wie die Werte abrufen in m. Ich bin neu Datenrahmen daher sehr verwirrt

+0

Es ist ein wenig schwierig, Ihr Problem zu verstehen. Können Sie das Beispiel für das Ausgabedatenfeld schreiben? –

+0

Danke :) Bitte überprüfen Sie mein Update – Denver

+0

können Sie mit Gruppe von Ansatz aktualisieren, die Sie versucht haben? – eliasah

Antwort

4

Dies ist eigentlich sehr einfach pyspark, lassen Sie uns einige Daten erstellen zuerst:

schema = ['c1','c2','c3','c4','c5','c6'] 

rdd = sc.parallelize(["a,x,y,z,g,h","b,x,y,z,l,h","c,x,y,z,g,h","d,x,f,y,g,i","e,x,y,z,g,i"]) \ 
     .map(lambda x : x.split(",")) 

df = sqlContext.createDataFrame(rdd,schema) 
# +---+---+---+---+---+---+ 
# | c1| c2| c3| c4| c5| c6| 
# +---+---+---+---+---+---+ 
# | a| x| y| z| g| h| 
# | b| x| y| z| l| h| 
# | c| x| y| z| g| h| 
# | d| x| f| y| g| i| 
# | e| x| y| z| g| i| 
# +---+---+---+---+---+---+ 

Jetzt ist der spaßige Teil, werden Sie nur einige Funktionen importieren müssen, Gruppe durch und explodiere wie folgt:

from pyspark.sql.functions import * 

dupes = df.groupBy('c2','c3','c4','c5') \ 
      .agg(collect_list('c1').alias("c1s"),count('c1').alias("count")) \ # we collect as list and count at the same time 
      .filter(col('count') > 1) # we filter dupes 

df2 = dupes.select(explode("c1s").alias("c1_dups")) 

df2.show() 
# +-------+ 
# |c1_dups| 
# +-------+ 
# |  a| 
# |  c| 
# |  e| 
# +-------+ 

Ich hoffe, das beantwortet Ihre Frage.

+0

Vielen Dank !!! – Denver

+0

Aber was passiert, wenn ich mehrere solche Duplikate habe, überprüfe die Frage nach meiner Bearbeitung in der gewünschten Ausgabe – Denver

+0

Ich glaube, dass Sie eine Follow-up-Frage statt der Bearbeitung fragen sollten. Es wird sonst ein Durcheinander sein. – eliasah

Verwandte Themen