2017-10-31 2 views
3

Ich versuche, die neuesten Datensätze aus einer Tabelle mit Self-Join zu erhalten. Es funktioniert mit spark-sql aber nicht mit Funken DataFrame API arbeiten.Self-Join funktioniert nicht wie erwartet mit der DataFrame-API

Kann jemand helfen? Ist es ein Fehler?

Ich bin mit Spark-2.2.0 im lokalen Modus

Eingang Erstellen DataFrame:

scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time") 
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]  

scala> val df33 = df3 
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field] 

scala> df3.show 
+---+-----+----+ 
| id|value|time| 
+---+-----+----+ 
| 1| a| 1| 
| 1| aa| 2| 
| 2| b| 2| 
| 2| bb| 5| 
+---+-----+----+ 

scala> df33.show 
+---+-----+----+ 
| id|value|time| 
+---+-----+----+ 
| 1| a| 1| 
| 1| aa| 2| 
| 2| b| 2| 
| 2| bb| 5| 
+---+-----+----+ 

nun die Join mithilfe von SQL ausführen:

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show 
+---+-----+----+ 
| id|value|time| 
+---+-----+----+ 
| 1| aa| 2| 
| 2| bb| 5| 
+---+-----+----+ 

Jetzt arbeitet die Durchführung der Join mit Dataframe api: funktioniert nicht

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time"))).select(df33.col("id"),df33.col("value"),df33.col("time")).show 
+---+-----+----+ 
| id|value|time| 
+---+-----+----+ 
+---+-----+----+ 

Das, was zu beachten ist die Pläne erklären: Rohling für die DataFrame API !!

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time"))).select(df33.col("id"),df33.col("value"),df33.col("time")).explain 
== Physical Plan == 
LocalTableScan <empty>, [id#150, value#151, time#152] 

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain 
== Physical Plan == 
*Project [id#1241, value#1242, time#1243] 
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243) 
    :- *Sort [id#150 ASC NULLS FIRST], false, 0 
    : +- Exchange hashpartitioning(id#150, 200) 
    :  +- *Project [_1#146 AS id#150, _3#148 AS time#152] 
    :  +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148] 
    :   +- Scan ExternalRDDScan[obj#145] 
    +- *Sort [id#1241 ASC NULLS FIRST], false, 0 
     +- Exchange hashpartitioning(id#1241, 200) 
     +- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243] 
      +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148] 
       +- Scan ExternalRDDScan[obj#145] 

Antwort

4

Nein, das ist kein Fehler ist, aber wenn Sie neu zuweisen, den Datenrahmen zu einem neuen wie das, was Sie getan haben, ist es die Linie tatsächlich kopiert, aber es dupliziert die Daten nicht. So werden Sie in der gleichen Spalte vergleichen.

Verwendung spark.sql ist etwas anders, weil es tatsächlich ist auf Aliase Ihrer DataFrame s arbeiten

So die richtige Weg, um eine durchzuführen Selbstverknüpfung mithilfe der API ist eigentlich Aliasing Ihre DataFrame wie folgt:

val df1 = Seq((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5)).toDF("id","value","time") 

df1.as("df1").join(df1.as("df2"), $"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show 
// +---+-----+----+ 
// | id|value|time| 
// +---+-----+----+ 
// | 1| aa| 2| 
// | 2| bb| 5| 
// +---+-----+----+ 

Für weitere Informationen zu Self-Joins empfehle ich, High Performance Spark by Rachel Warren, Holden Karau - Chapter 4 zu lesen.

+0

auch ich dachte, dass es etwas mit dem zu tun hat " doppelte "Spaltennamen. Vielen Dank für die Empfehlung einer Lösung und ein Buch zu verweisen. Ich bin gespannt zu wissen, wie haben Sie festgestellt, dass spark.sql verwendet Alias? – bigdatamann

+0

Gern geschehen! Bitte vergiss nicht zu akzeptieren und upvote :) – eliasah

+1

Ich werde versuchen, Ihre Lösung auf mein aktuelles Problem ein wenig später und aktualisieren - akzeptieren Sie die Antwort. – bigdatamann

0

Während der Arbeit mit spark-sql api, wenn ein dataframe einem anderen Objekt zugewiesen wird, verweisen beide auf denselben Datenspeicherort. Auch wenn Sie select api verwenden, indem Sie alle ursprünglichen columns auswählen, um ein anderes Datenrahmenobjekt zu erstellen, verweisen sie auf denselben Datenspeicherort.

Meine Lösung wäre eine Spalte umbenennen, so dass eine andere Datenstelle, wie unten erstellt wird

val df33 = df3.withColumnRenamed("time", "time2") 

df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time2"))).select(df33.col("id"),df33.col("value"),col("time2").as("time")).show 

Ich hoffe, die Antwort hilfreich

Verwandte Themen