2017-08-22 1 views
0

Angenommen, ich habe zwei Datenrahmen wie folgt vor:in Funkendatenrahmen (scala), basierend auf keine Nullwerte

Erste -

A | B | C | D 
1a | 1b | 1c | 1d 
2a | null | 2c | 2d 
3a | null | null | 3d 
4a | 4b | null | null 
5a | null | null | null 
6a | 6b | 6c | null 

Second -

P | B | C | D 
1p | 1b | 1c | 1d 
2p | 2b | 2c | 2d 
3p | 3b | 3c | 3d 
4p | 4b | 4c | 4d 
5p | 5b | 5c | 5d 
6p | 6b | 6c | 6d 

Die Join-Operation ist basierend auf {"B", "C", "D"} durchgeführt. Wenn in einer dieser Spalten NULL vorkommt, sollte in den verbleibenden Spalten nach Werten gesucht werden, die nicht Null sind.

So sollte das Ergebnis sein wie -

P | B | C | D | A 
1p | 1b | 1c | 1d | 1a 
2p | null | 2c | 2d | 2a 
3p | null | null | 3d | 3a 
4p | 4b | null | null | 4a // First(C) & First(D) was null so we take only B 
6p | 6b | 6c | null | 6a 

jemand eine Lösung für diese Abfrage vorschlagen kann? Momentan versuche ich Werte mit Nullwerten in einer einzelnen Spalte, zwei Spalten, drei Spalten zu filtern. Dann verbinden Sie sie mit Second, ohne diese Spalte zu nehmen. Für zB - habe ich zunächst Werte mit nur B als Null von First herausgefiltert. Dann verbinden Sie es mit Second basierend auf "C" und "D". Auf diese Weise werde ich viele Datenframes bekommen, und ich werde sie schließlich verbinden.

Antwort

1

Hier ist, was Sie

tun können
import org.apache.spark.sql.functions._ 
df1.join(broadcast(df2), df1("B") === df2("B") || df1("C") === df2("C") || df1("D") === df2("D")) 
    .drop(df2("B")) 
    .drop(df2("C")) 
    .drop(df2("D")) 
    .show(false) 

sicherer sein können Sie broadcast die dataframe, die kleiner ist.

+0

Ihre Lösung ist zu einem gewissen Grad richtig. Also habe ich deine Lösung auf diese erweitert - '"df1.join (broadcast (df2), (((df2 (" B ") === null) || (df1 (" B ") === df2 (" B "))) && ((df2 (" C ") === null) || (df1 (" C ") === df2 (" C "))) && ((df2 (" D ") ===) null) || (df1 ("D") === df2 ("(D"))))) .drop (df2 ("B")) .drop (df2 ("C")) .drop (df2 ("D")) .show (false) " Es bedeutet, dass es df1 (Col) sollte entweder null oder gleich df2 (Col) sein Aber immer noch bekomme ich nur einzelne Zeile - 1a | 1p | 1b | 1c | 1d Aber nicht andere. Irgendeine Idee warum? – Ishan

+0

ist es nicht die Lösung für Ihre Frage? :) hehehe. Das ist, weil Sie && verwenden. Und warum benutzt du 'df2 (" B ") === null)'? Null-Werte sind in DF1, oder? –

+0

Ich weiß nicht, was Sie versuchen, aber Sie sollten 'df1 versuchen.join (broadcast (df2), ( ((df2 ("B") === "null") || (df1 ("B") === df2 ("B"))) && ((df2 ("C") === "null") || (df1 ("C") === df2 ("C"))) && ((df2 ("D") === "null") || (df1 ("D") === df2 ("(D"))))) .drop (df2 ("B")) .drop (df2 ("C")) .drop (df2 ("D")) .show (false) '. Aber Sie sollten wissen, welcher Datenrahmen die Nullwerte hat. Entsprechend Ihrer Frage hat df1 null Werte nicht df2 –

0

Ich denke, links anschließen sollte die Arbeit machen, versuchen Sie den folgenden Code ein:

val group = udf((p1: String, p2: String, p3: String) => if (p1 != null) p1 else if (p2 != null) p2 else if (p3 != null) p3 else null) 
val joined = first.join(second.select("B", "P"), Seq("B"), "left") 
        .withColumnRenamed("P", "P1") 
        .join(second.select("C", "P"), Seq("C"), "left") 
        .withColumnRenamed("P", "P2") 
        .join(second.select("D", "P"), Seq("D"), "left") 
        .withColumnRenamed("P", "P3") 
        .select($"A", $"B", $"C", $"D", group($"P1", $"P2", $"P3") as "P") 
        .where($"P".isNotNull) 

Hope this Sie hilft, sonst äußern Ihre Probleme

+0

Vielen Dank für Ihre Antwort. Ich habe meine Frage bearbeitet. Es gab einen Fehler. Und ich habe versucht, Ihre Lösung zu verwenden. Aber ich bekomme "Null" -Wert statt "4a" (siehe in der kommentierten Zeile) – Ishan

+0

Was ist, wenn Sie versuchen, nur auf 'B' @Ishan beizutreten? –

+0

Ich muss zuerst auf der Basis von B, C und D beitreten. Wenn eine Spalte null hat, dann muss ich basierend auf zwei anderen beitreten. Ähnlich, wenn zwei Spalten gleichzeitig Nullwerte haben, schließe ich mich dem dritten an. – Ishan