2017-08-29 1 views
1

Zünd-/Scala: Verschachtelte Struktur anlegen ReduceByKey mit mit RDD nurZünd-/Scala: Verschachtelte Struktur anlegen ReduceByKey mit mit RDD nur

ich verschachtelte Struktur erstellen möchten nur RDD verwenden. Ich kann dies mit der Funktion groupBy tun, die bei großen Daten nicht gut funktioniert. Also möchte ich es mit reduceByKey machen, aber ich bekomme nicht, was ich will. Jede Hilfe wäre willkommen.

Eingabedaten:

val sales=sc.parallelize(List(
    ("West", "Apple", 2.0, 10), 
    ("West", "Apple", 3.0, 15), 
    ("West", "Orange", 5.0, 15), 
    ("South", "Orange", 3.0, 9), 
    ("South", "Orange", 6.0, 18), 
    ("East", "Milk", 5.0, 5))) 

erforderliche Ausgabe ist Liste der Structs. Ich bin in der Lage dies mit groupByKey Wie unten zu tun:

sales.map(value => (value._1 ,(value._2,value._3,value._4 ))) 
    .groupBy(_._1) 
    .map { case(k,v) => (k, v.map(_._2)) } 
    .collect() 
    .foreach(println) 

// (South,List((Orange,3.0,9), (Orange,6.0,18))) 
// (East,List((Milk,5.0,5))) 
// (West,List((Apple,2.0,10), (Apple,3.0,15), (Orange,5.0,15))) 

Aber ich möchte die gleiche Sache reduceByKey mit erreichen. Ich bin nicht in der Lage, List [Struct] zu bekommen. Stattdessen kann ich List [List] bekommen. Gibt es eine Möglichkeit List [Struct] zu bekommen?

sales.map(value => (value._1 ,List(value._2,value._3,value._4))) 
    .reduceByKey((a,b) => (a ++ b)) 
    .collect() 
    .foreach(println) 

// (South,List(Orange, 3.0, 9, Orange, 6.0, 18)) 
// (East,List(Milk, 5.0, 5)) 
// (West,List(Apple, 2.0, 10, Apple, 3.0, 15, Orange, 5.0, 15)) 

sales.map(value => (value._1 ,List(value._2,value._3,value._4))) 
    .reduceByKey((a,b) =>(List(a) ++ List(b))) 
    .collect() 
    .foreach(println) 

// (South,List(List(Orange, 3.0, 9), List(Orange, 6.0, 18))) 
// (East,List(Milk, 5.0, 5)) 
// (West,List(List(List(Apple, 2.0, 10), List(Apple, 3.0, 15)), List(Orange, 5.0, 15))) 

Antwort

2
  • Sie können nicht-reduceByKey erfordert eine Funktion (V, V) ⇒ V kann daher nicht die Typen ändern. Siehe zum Beispiel Can reduceBykey be used to change type and combine values - Scala Spark?
  • Sie aggregateByKey oder combineByKey verwenden können, aber es wird nicht die Leistung verbessern weil Ihre Prozessdatenmenge nicht reduzieren. Siehe zum Beispiel Spark groupByKey alternative.
  • Sie ein wenig mit (keine Notwendigkeit für temporäre Objekte) gewinnen können:

    sales.map(value => (value._1 ,(value._2,value._3,value._4))).groupByKey 
    
Verwandte Themen