2017-07-07 2 views
0

ich zwei Datenrahmen hava, ist die Anzahl von zwei Datenrahmen gleich, ich will in zwei Datenrahmen die Summe der einzelnen Werte erhalten diese eingegeben wird:Wie in Funken zwei dataframe'value mit scala berechnen

+---+ and  +---+ 
|df1|   |df2| 
+---+   +---+ 
| 11|   | 1| 
| 12|   | 2| 
| 13|   | 3| 
| 14|   | 4| 
| 15|   | 5| 
| 16|   | 6| 
| 17|   | 7| 
| 18|   | 8| 
| 19|   | 9| 
| 20|   | 10| 
+---+   +---+ 

dies ist mein Code:

val df1 = sc.parallelize(1 to 10,2).toDF("df1") 
    val df2 = sc.parallelize(11 to 20,2).toDF("df2") 
    val df3=df1.rdd.zip(df2.rdd).map(x=>{ 
     x._1.getInt(0)+x._2.getInt(0) 
    }).toDF("result") 
    df3.show() 

das Ergebnis:

ich cha haben nge Datenframe zu rdd dann zip zwei rdd, wie man zwei Datenframe mit nicht zu rdd transformieren?

Antwort

1

Sie können einfach die Window Funktion verwenden, um row_number für die Verbindung von dataframes zu erstellen. Nach dem Beitritt summieren Sie einfach die beiden Spalten.

import org.apache.spark.sql.expressions.Window 
import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 

val df1 = sc.parallelize(1 to 10,2).toDF("df1") 
val df2 = sc.parallelize(11 to 20,2).toDF("df2") 

df1.withColumn("rowNo", row_number() over Window.orderBy("df1")) 
    .join(df2.withColumn("rowNo", row_number() over Window.orderBy("df2")), Seq("rowNo")) 
    .select(($"df1"+$"df2").alias("result")) 
    .show(false) 
+0

Gibt es einen effizienteren Weg, dies zu tun? – mentongwu

+0

Fenster wird Partition ändern, wenn ich Fenster verwenden muss ich neu partitionieren, wenn ich es berechnet – mentongwu

+0

Yes Window-Funktion wird definitiv Partition ändern. Wenn Sie nicht alle Ihre Daten in einer Partition zusammenfassen, wie würden Sie sicherstellen, dass 11 von df2 und 1 von df1 auf demselben Executor sind. Wir können nicht zwei Datenrahmen verbinden, ohne sicherzustellen, dass sich alle Partitionen auf demselben Executor befinden. Ihre Anforderung ist so. –

0

können Sie monotonically_increasing_id() verwenden, um eine ID für beide Datenrahmen zu geben, und kommen mit zwei Spalten hinzufügen.

import spark.implicits._ 
val df1 = spark.sparkContext.parallelize(11 to 20).toDF("df1") 
val df2 = spark.sparkContext.parallelize((1 to 10)).toDF("df2") 

df1.withColumn("id", monotonically_increasing_id()) 
    .join(df2.withColumn("id", monotonically_increasing_id()), "id") 
    .withColumn("result", ($"df1" + $"df2")).drop("id").show 

Ausgang:

+---+---+------+ 
|df1|df2|result| 
+---+---+------+ 
| 11| 1| 12| 
| 18| 8| 26| 
| 17| 7| 24| 
| 20| 10| 30| 
| 16| 6| 22| 
| 12| 2| 14| 
| 14| 4| 18| 
| 19| 9| 28| 
| 13| 3| 16| 
| 15| 5| 20| 
+---+---+------+ 

hoffe, das hilft!

+0

Ich hoffe das hat geholfen! –