2016-03-31 4 views
0

Ich arbeite an Spark-Datenframes und ich muss eine Gruppe von einer Spalte und konvertieren Sie die Spaltenwerte von gruppierten Zeilen in ein Array von Elementen als neue Spalte. Beispiel:Konvertieren von Zeilenwerten in ein Spaltenarray in Spark Dataframe

Input: 

employee | Address 
------------------ 
Micheal | NY 
Micheal | NJ 

Output: 

employee | Address 
------------------ 
Micheal | (NY,NJ) 

Jede Hilfe ist sehr willkommen.! Hier

+0

Scheint, wie Sie groupByKey verwenden können, um zu bekommen, was Sie wollen, was Sie Iterable von [Adresse] geben. – Manas

+0

@Manas das ist der Fehler ich bekomme GroupByKey ist kein Mitglied von org.apache.spark.sql.DataFrame – vds

+2

zeigen Sie uns Ihren Code ..... –

Antwort

5

ist eine alternative Lösung Wo ich den Datenrahmen zu einem rdd umgewandelt habe für die Transformationen und wandelte sie einen Datenrahmen sqlContext.createDataFrame()

Sample.json mit zurück

{"employee":"Michale","Address":"NY"} 
{"employee":"Michale","Address":"NJ"} 
{"employee":"Sam","Address":"NY"} 
{"employee":"Max","Address":"NJ"} 

Spark-Anwendung

val df = sqlContext.read.json("sample.json") 

// Printing the original Df 
df.show() 

//Defining the Schema for the aggregated DataFrame 
val dataSchema = new StructType(
    Array(
    StructField("employee", StringType, nullable = true), 
    StructField("Address", ArrayType(StringType, containsNull = true), nullable = true) 
) 
) 
// Converting the df to rdd and performing the groupBy operation 
val aggregatedRdd: RDD[Row] = df.rdd.groupBy(r => 
      r.getAs[String]("employee") 
     ).map(row => 
      // Mapping the Grouped Values to a new Row Object 
      Row(row._1, row._2.map(_.getAs[String]("Address")).toArray) 
     ) 

// Creating a DataFrame from the aggregatedRdd with the defined Schema (dataSchema) 
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, dataSchema) 

// Printing the aggregated Df 
aggregatedDf.show() 

Ausgabe:

+-------+--------+---+ 
|Address|employee|num| 
+-------+--------+---+ 
|  NY| Michale| 1| 
|  NJ| Michale| 2| 
|  NY|  Sam| 3| 
|  NJ|  Max| 4| 
+-------+--------+---+ 

+--------+--------+ 
|employee| Address| 
+--------+--------+ 
|  Sam| [NY]| 
| Michale|[NY, NJ]| 
|  Max| [NJ]| 
+--------+--------+ 
+0

Diese Antwort ist in Ordnung, aber die Verwendung der RDD-API ist langsamer als die Verwendung von DataFrame API mit großem Abstand (aufgrund des Fehlens von Abfrageoptimierer und Wolfram) – tribbloid

-1

Sie können versuchen, und groupBy verwenden und dann schwenken:

val dfPivot = df.groupBy("employee").pivot("Address").max() 
Verwandte Themen