2016-06-09 7 views
0

Ich muss ein Szenario in Spark mit Scala API schreiben. Ich übergebe eine benutzerdefinierte Funktion an einen Dataframe, der jede Reihe von Datenrahmen nacheinander verarbeitet und Tupel (Row, Row) zurückgibt. Wie kann ich RDD (Zeile, Zeile) zu Dataframe (Zeile) ändern? Im folgenden sehen Sie Codebeispiel -Spark - So konvertieren Sie Kartenfunktionsausgabe (Zeile, Zeile) Tupel zu einem Dataframe

**Calling map function-** 
    val df_temp = df_outPut.map { x => AddUDF.add(x,date1,date2)} 
**UDF definition.** 
    def add(x: Row,dates: String*): (Row,Row) = { 
...................... 
........................ 
    var result1,result2:Row = Row() 
.......... 
    return (result1,result2) 

Jetzt df_temp ist ein RDD (Row1, Row2). Meine Anforderung besteht darin, es zu einem RDD oder Dataframe zu machen, indem Tupelelemente auf 1 RDD-Datensatz oder Dataframe RDD (Zeile) aufgeteilt werden. Schätze deine Hilfe.

+0

Wie möchten Sie die zwei Row-Elemente kombiniert werden? Sollten die Spalten der zweiten an die der ersten angehängt werden? Könnte es gemeinsame Spalten in beiden Reihen geben? Die Frage ist ohne diese Information unklar. –

Antwort

2

Sie flatMap können Ihre Row Tupel zu glätten, sagen, wenn wir aus diesem Beispiel rdd starten:

rddExample.collect() 
// res37: Array[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = Array(([1,2],[3,4]), ([2,1],[4,2])) 

val flatRdd = rddExample.flatMap{ case (x, y) => List(x, y) } 
// flatRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[45] at flatMap at <console>:35 

Um zu wandeln es in Datenrahmen.

import org.apache.spark.sql.types.{StructType, StructField, IntegerType} 

val schema = StructType(StructField("x", IntegerType, true):: 
         StructField("y", IntegerType, true)::Nil)  
val df = sqlContext.createDataFrame(flatRdd, schema) 
df.show 
+---+---+ 
| x| y| 
+---+---+ 
| 1| 2| 
| 3| 4| 
| 2| 1| 
| 4| 2| 
+---+---+ 
+0

Arbeitete wie Charme. Danke vielmals :) –

Verwandte Themen