2016-09-08 2 views
5

Ich habe einen Datensatz Daten wie die folgenden enthalten:GroupByKey mit Datensätzen in Spark-2.0 unter Verwendung von Java

|c1| c2| 
--------- 
| 1 | a | 
| 1 | b | 
| 1 | c | 
| 2 | a | 
| 2 | b | 

...

Jetzt möchte ich die Daten wie folgt (col1 gruppiert bekommen : String Key, col2: Liste):

| c1| c2 | 
----------- 
| 1 |a,b,c| 
| 2 | a, b| 
... 

ich dachte, dass goupByKey mit einer ausreichenden Lösung sein würde, aber ich kann kein Beispiel nicht finden, wie es zu benutzen.

Kann mir jemand helfen, eine Lösung mit groupByKey zu finden oder eine andere Kombination von Transformationen und Aktionen zu verwenden, um diese Ausgabe mit Hilfe von Datasets, nicht RDD?

Antwort

2

Hier ist Spark-2.0 und Java-Beispiel mit Dataset.

public class SparkSample { 
    public static void main(String[] args) { 
    //SparkSession 
    SparkSession spark = SparkSession 
      .builder() 
      .appName("SparkSample") 
      .config("spark.sql.warehouse.dir", "/file:C:/temp") 
      .master("local") 
      .getOrCreate();  
    //input data 
    List<Tuple2<Integer,String>> inputList = new ArrayList<Tuple2<Integer,String>>(); 
    inputList.add(new Tuple2<Integer,String>(1, "a")); 
    inputList.add(new Tuple2<Integer,String>(1, "b")); 
    inputList.add(new Tuple2<Integer,String>(1, "c")); 
    inputList.add(new Tuple2<Integer,String>(2, "a")); 
    inputList.add(new Tuple2<Integer,String>(2, "b"));   
    //dataset 
    Dataset<Row> dataSet = spark.createDataset(inputList, Encoders.tuple(Encoders.INT(), Encoders.STRING())).toDF("c1","c2"); 
    dataSet.show();  
    //groupBy and aggregate 
    Dataset<Row> dataSet1 = dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2")).toDF("c1","c2"); 
    dataSet1.show(); 
    //stop 
    spark.stop(); 
    } 
} 
+0

Glad to DataSet konnte ich helfen. – abaghel

+0

Danke, es funktioniert! –

1

Mit einem Datenrahmen in Spark-2.0:

scala> val data = List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b")).toDF("c1", "c2") 
data: org.apache.spark.sql.DataFrame = [c1: int, c2: string] 
scala> data.groupBy("c1").agg(collect_list("c2")).collect.foreach(println) 
[1,WrappedArray(a, b, c)] 
[2,WrappedArray(a, b)] 
0

Dies wird die Tabelle in Variable lesen

Dataset<Row> datasetNew = dataset.groupBy("c1").agg(functions.collect_list("c2")); 
datasetNew.show() 
Verwandte Themen