2016-05-13 10 views
0

Um genauer zu sein, konvertieren, wie kann ich ein scala.Iterable zu einem org.apache.spark.rdd.RDD konvertieren?Wie ein Iterable zu einem RDD

Ich habe eine RDD von (String, Iterable [(String, Integer)]) und ich will diese von (String, RDD [String, Integer]) in eine RDD umgewandelt werden so dass ich eine reduceByKey Funktion auf den internen RDD anwenden kann.

zB ich habe eine RDD, wo die Taste 2 geschriebenes ist Präfix des Namens einer Person und der Wert Liste der Paare von Personennamen und Stunden, die sie in einem Fall ausgegeben

meine RDD ist:

("To", List(("Tom",50),("Tod","30"),("Tom",70),("Tod","25"),("Tod",15)) ("Ja", List(("Jack",50),("James","30"),("Jane",70),("James","25"),("Jasper",15))

Ich brauche die Liste in RDD umgewandelt werden, so dass ich akkumulieren kann jede Person verbrachte Stunden insgesamt. Anwenden reduceByKey und machen das Ergebnis als ("To", RDD(("Tom",120),("Tod","70")) ("Ja", RDD(("Jack",120),("James","55"),("Jane",15))

Aber ich fand keine solche Transformation-Funktion. Wie kann ich das machen ?

Vielen Dank im Voraus.

+0

Ich ging durch http://stackoverflow.com/questions/33284507/converting-a-scala-iterabletuple-to-rdd?rq=1 aber sparkContext.parallelize irgendwie versucht, mehrere zu erstellen sparkContext und verursacht somit eine Ausnahme. Ich möchte einzelne Instanz von sparkContext in meiner Anwendung behalten. –

+0

Warum brauchen Sie das? –

+0

Es ist nicht möglich. Sie können RDDs nicht verschachteln. – zero323

Antwort

2

Sie können dies erreichen, indem Sie flatMap und reduceByKey verwenden. Etwas wie folgt aus:

rdd.flatMap{case(key, list) => list.map(item => ((key,item._1), item._2))} 
    .reduceByKey(_+_) 
    .map{case((key,name),hours) => (key, List((name, hours)))} 
    .reduceByKey(_++_) 
+0

Es Es hat sich wie ein Zauberspruch bewährt, danke tonn. Da ich neu bei scala bin, kann ich diese Lösung verstehen, aber nicht sehr deutlich. Einige der Fragen, die ich habe, schließen ein: Was macht hier "case"? in der letzten tatsächlich Elemente zur Liste für das gleiche Präfix hinzufügen? Und das letzte ist: Wie kann ich sortieren die resultierende? –

+0

'case' ist nur normale Mustererkennung,' ++ 'hängt zwei Listen, und Sie können' sortBy' verwenden um das Ergebnis zu sortieren :) –

+0

@Puneet Chuarasia hat meine Antwort bearbeitet, die es eigentlich falsch gemacht hat (!!). Die Klammern um "(name, hours)" _ARE_ werden benötigt, da sie einer Liste von Tupeln zugeordnet werden müssen, nicht einer Liste vom Typ Any. Ich habe die Antwort zurück zu meiner ursprünglich richtigen Antwort geändert. –

Verwandte Themen