2017-01-06 5 views
1

Ich versuche, mehrere MySQL-Tabellen auf Spark zu verbinden. Einige dieser Tabellen haben doppelte Spaltennamen (jede Tabelle hat ein spezifisches ID-Feld für diese Tabelle).Join-Tabellen mit doppelten Spaltennamen in Spark

Wenn ich versuche zu laufen:

val myDF = session.read.jdbc("t1 inner join t2 on t1.t2_id = t2.id, queryTable, prop) 
myDF.show 

ich java.sql.SQLIntegrityConstraintViolationException: Column 'id' in field list is ambiguous erhalten, da beide Tabellen ein ID-Feld haben (mit unterschiedlichen Bedeutungen)

Ich habe versucht zu tun:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").alias("a") 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").alias("b") 
val joinedDF = t1DF.join(t2DF, Seq("a.t2_id", "b.id")) 
    .selectExpr("ent.id as entity_id", "lnk.pagerank") 

habe ich die Fehler org.apache.spark.sql.AnalysisException: using columns ['t1.t2_id,'t2.id] can not be resolved given input columns: [..] Es scheint, dass der Analyzer nicht weiß, wie er mit Aliasen umgehen soll.

Die einzige Option, die unter Verwendung einer Unterabfrage zu funktionieren scheint:

spark.read.jdbc(dbstring, "(select t1.id as t1_id, t1.t2_id from 
t1 inner join t2 on t1.t2_id = t2.id) t", "t2_id") 

Obwohl in diesem Fall die die Unterabfrage ausgeführt beenden müssen, bevor ich irgendwelche Filter tun, Dinge un-akzeptierbar langsam und alle machen Abfrage Partitionierung nutzlos.

Spark scheint eine interne Art der Disambiguation zwischen IDs id#528 und id#570 zu haben, aber ich kann nicht herausfinden, irgendeine Art von Bezug auf sie in einer Select-Anweisung.

Antwort

0

Ich hatte das gleiche Problem. Die einzige Lösung, die ich gefunden habe, war, ein Suffix für die Spaltennamen hinzuzufügen. Es sieht ungefähr so ​​aus:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").select(col("id").alias("id_t1")) 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").select(col("id").alias("id_t2")) 

val joinedDF = t1DF.join(t2DF, t1DF("id_t1") === t2DF("id_t2")) 
Verwandte Themen