0

Was ist die beste gleichwertig mit Spark-Datenrahmen würde
Spark-Update Spaltenwert in einem Dataframe

update table1 set colx = "some value" where coly in (select coltab2 from table2 where [another condition)]

ich eine Arbeitslösung muss SQL, aber ich bin wirklich nicht sehr zufrieden mit ihm. Sieht wirklich umständlich und ich hoffe, dass ich eine einfachere Art und Weise verpassen

Zuerst bekomme ich den Wert für die where-Klausel (es könnte Tausende sein, damit ich eine Sammlung nicht benutzen Stab)

val df2 = xxxx.select("coltab2") 
df2: org.apache.spark.sql.DataFrame = [coltab2: string] 

dieser Datenrahmen enthält Alle Werte, die ich in der WHERE-Klausel beibehalten möchte

Dann führe ich eine linke äußere Verbindung mit Tabelle1, um coltab2 auf df2.coltab2=df1.coly hinzuzufügen. Wenn der Wert des zugegebenen coltab2 nicht null bedeutet dies, dass es in table2 vorlag so verwende ich diese Bedingung eine andere Spalte von der ursprünglichen tabelle1 zu aktualisieren (df1) und dann diese zusätzliche Spalte coltab2 fallen, die nur als Bedingung diente, um eine weitere Spalte zu aktualisieren

val df_updated = df1.join(df2, df1("coly") === df2("coltab2"), "left_outer").withColumn("colx", when(!isnull($"coltab2"), "some value").otherwise(col("colx"))).drop(col("coltab2")) 

Hoffe, dass ich völlig falsch bin und es ist ein effizienter Weg, es zu tun;)

+0

es perfekte Lösung zu sein scheint. Es gibt zwei Verbesserungen, die getan werden müssen. 1 statt left_outer verbinden Sie einfach links verwenden können beitreten und 2 .otherwise (col („COLX“)), wird COLX gerade ist und in, wenn sie nicht verwendet werden können. –

Antwort

0

ich denke, was Sie haben, ist eine saubere Lösung mit guter Lesbarkeit. Wenn Sie möchten, können Sie einen anderen Ansatz mit RDD erkunden. Mit der Annahme, dass Ihre Spaltenliste nicht groß ist, können Sie die Spaltenliste in ein set und Karte colx in df1 entsprechend collect wie folgt:

val df1 = Seq(
    ("x1", "y1"), ("x2", "y2"), ("x3", "y3") 
).toDF("colx", "coly") 

val df2 = Seq(
    ("y1"), ("y3"), ("y5") 
).toDF("coltab2") 

import org.apache.spark.sql.Row 

val colList: Set[String] = df2.rdd.map{ case Row(c: String) => c }.collect.toSet 

val dfUpdated = df1.rdd.map{ 
    case Row(x: String, y: String) => (if (colList contains y) "some value" else x, y) 
    }.toDF("colx", "coly") 

dfUpdated.show 
+----------+----+ 
|  colx|coly| 
+----------+----+ 
|some value| y1| 
|  x2| y2| 
|some value| y3| 
+----------+----+ 
Verwandte Themen