2017-11-13 1 views
1

In einem Datenframe versuche ich diejenigen Zeilen zu identifizieren, die einen Wert in Spalte C2 haben, der in Spalte C1 in keiner anderen Zeile existiert. Ich habe versucht den folgenden Code:Warum left_anti Join funktioniert nicht wie erwartet in Pyspark?

in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3']) 
in_df.show() 
    +---+----+---+ 
    | C1| C2| C3| 
    +---+----+---+ 
    | 1|null| A| 
    | 2| 1| B| 
    | 3|null| C| 
    | 4| 11| D| 
    +---+----+---+ 
filtered = in_df.filter(in_df.C2.isNotNull()) 
filtered.show() 
    +---+---+---+ 
    | C1| C2| C3| 
    +---+---+---+ 
    | 2| 1| B| 
    | 4| 11| D| 
    +---+---+---+ 

nun eine left_anti Anwendung join nur die Zeile 4 zurück erwartet wird, aber auch erhalte ich Zeile 2:

filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show() 
    +---+---+---+ 
    | C1| C2| C3| 
    +---+---+---+ 
    | 2| 1| B| 
    | 4| 11| D| 
    +---+---+---+ 

Wenn ich ‚materialisieren‘ das gefilterte DF die Ergebnis ist wie erwartet:

filtered = filtered.toDF(*filtered.columns) 
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show() 
    +---+---+---+ 
    | C1| C2| C3| 
    +---+---+---+ 
    | 4| 11| D| 
    +---+---+---+ 

Warum wird diese .toDF benötigt?

Antwort

1

in_df.C1 ist beziehe tatsächlich zu einer filtered Spalte wie der folgende Code zeigt:

in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3']) 
filtered = in_df.filter(in_df.C2.isNotNull()).select("C2") 
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show() 

Py4JJavaError: An error occurred while calling o699.join. : org.apache.spark.sql.AnalysisException: cannot resolve ' in_df.C1 ' given input columns: [C2, C1, C2, C3];; 'Join LeftAnti, ('in_df.C1 = 'filtered.C2) :- Project [C2#891L] : +- Filter isnotnull(C2#891L) : +- LogicalRDD [C1#890L, C2#891L, C3#892] +- LogicalRDD [C1#900L, C2#901L, C3#902]

Also im Grunde, wenn die zwei Datenrahmen verbinden verwenden Sie tatsächlich die Bedingung filtered.C1 == filtered.C2:

filtered = in_df.filter(in_df.C2.isNotNull()) 
filtered.join(in_df,(filtered.C1 == filtered.C2), 'left_anti').show() 

    +---+---+---+ 
    | C1| C2| C3| 
    +---+---+---+ 
    | 2| 1| B| 
    | 4| 11| D| 
    +---+---+---+ 

Möglicherweise haben Sie den Namen des Datenrahmens geändert, aber die darin enthaltenen Spalten können immer noch mit dem Aufrufabgerufen werden. Um sicherzustellen, dass Sie den richtigen Datenrahmen sich beziehen können Sie Aliasnamen verwenden:

import pyspark.sql.functions as psf 
filtered.alias("filtered").join(in_df.alias("in_df"),(psf.col("in_df.C1") == psf.col("filtered.C2")), 'left_anti').show() 

    +---+---+---+ 
    | C1| C2| C3| 
    +---+---+---+ 
    | 4| 11| D| 
    +---+---+---+ 

Der besten Weg, mit Spaltennamen Zweideutigkeiten zu behandeln ist, sie von Anfang an zu vermeiden (das Umbenennen von Spalten oder Aliase für Ihren Datenrahmen verwenden) .

+0

Ihre Antwort erklärt sehr gut, was bei meinem ersten Versuch schief gelaufen ist. Mein zweiter Versuch nach dem Anwenden von 'filtered = filtered.toDF' auf den identisch codierten Join scheint jedoch kein Problem zu haben, zwischen linken und rechten Spalten zu unterscheiden. Warum das? – heinzK

Verwandte Themen