2016-04-14 12 views
0

I Multi RDDs als Ergebnis bekommen haben und wollen, dass sie verschmelzen, sie sind im gleichen Format:was ist die beste Praxis RDDs in scala zu verschmelzen

RDD(id, HashMap[String, HashMap[String, Int]]) 
    ^   ^ ^
    |    |  | 
    identity  category distribution of the category 

Hier ist ein Beispiel dafür, dass rdd ist:

(1001, {age={10=3,15=5,16=8, ...}}) 

die erste Schlüssel String der HashMap[String, HashMap] ist die Kategorie der Statistik und die HashMap[String, Int] im HashMap[String, HashMap] ist die Verteilung der Kategorie. Nachdem jede Verteilung der verschiedenen Kategorien berechnet wurde, möchte ich sie nach der Identität zusammenführen, damit ich die Ergebnisse in der Datenbank speichern kann. Hier ist, was ich habe zur Zeit:

def mergeRDD(rdd1: RDD[(String, util.HashMap[String, Object])], 
       rdd2:RDD[(String, util.HashMap[String, Object])]): RDD[(String, util.HashMap[String, Object])] = { 

    val mergedRDD = rdd1.join(rdd2).map{ 
    case (id, (m1, m2)) => { 
     m1.putAll(m2) 
     (id, m1) 
    } 
    } 
    mergedRDD 
} 
val mergedRDD = mergeRDD(provinceRDD, mergeRDD(mergeRDD(levelRDD, genderRDD), actionTypeRDD)) 

Ich schreibe eine Funktion mergeRDD so, dass ich zwei RDDs jedes Mal zusammenführen kann, aber ich fand, dass Funktion nicht sehr elegant ist, als Neuling zu scala ist jeder inspirierend geschätzt.

+0

Was sind die Eigenschaften Ihrer Zusammenführungsfunktion? – eliasah

+0

@eliasah danke für die Antwort, aber was meinst du mit Eigenschaften? – armnotstrong

Antwort

2

Ich sehe keinen einfachen Weg, dies zu erreichen, ohne die Leistung zu beeinträchtigen. Grund ist, dass Sie nicht einfach zwei rdd zusammenführen, sondern Sie wollen, dass Ihre hashmap konsolidierte Werte nach der Vereinigung von rdd hat.

Jetzt ist Ihre Zusammenführungsfunktion falsch. Im momentanen Zustand wird Join tatsächlich innere Joins durchführen, wobei Zeilen ausgelassen werden, die entweder in rdd nicht vorhanden sind.

Richtiger Weg wäre etwas wie.

val mergedRDD = rdd1.union(rdd2).reduceByKey{ 
    case (m1, m2) => { 
     m1.putAll(m2) 
     } 
} 
+0

danke, dass du darauf hingewiesen hast – armnotstrong

0

Sie können die java.util.HashMap mit scala.collection.immutable.Map

Von dort ersetzen:

val rdds  = List(provinceRDD, levelRDD, genderRDD, actionTypeRDD) 
val unionRDD = rdds.reduce(_ ++ _) 
val mergedRDD = unionRDD.reduceByKey(_ ++ _) 

Dies wird unter der Annahme, dass die Kategorien nicht überlappen zwischen RDDs.

Verwandte Themen