2016-04-08 16 views
0

Erstens bin ich ein Neuling für Spark und Python. Ich versuche, eine RDD (resilient verteilte Datenmenge) in eine andere zu transformieren.PySpark IPython - reduzieren RDD in eine neue RDD mit anderen Schlüssel

Der Eingang RDD ist:

(u'Task1', (u'James', 10)), 
(u'Task1', (u'James', 15)), 
(u'Task1', (u'James', 18)), 
(u'Task1', (u'James', 11)), 
(u'Task1', (u'Oliver', 10)), 
(u'Task1', (u'Oliver', 15)), 
(u'Task2', (u'Oliver', 18)), 
(u'Task2', (u'Oliver', 11)), 

Nun Ich versuche, eine Funktion zu erstellen, die die Summe der Stunden für jede Person gibt, ganz gleich der Aufgabe:

def extract_time_tracking(time_bookings): 
    ??? 
    return (person, total_hours) 
time_trackings_sum = input_RDD.???(extract_time_tracking) 

Der Ausgang sollte sein:

(u'James', 54), # has been working on Task1 only 
(u'Oliver', 54), # has been working on Task1 and Task2 

Ich benutze PySpark IPython. Ich habe an combineByKey oder reduceByKey gedacht, aber sie verwenden immer den gleichen Schlüssel. Aber in meinem Fall unterscheidet sich die resultierende Taste von der Eingabetaste?!?!?

Danke für jede Hilfe.

Antwort

1

Verwenden Sie die Funktion map, um das Objekt so zu transformieren, dass das erste Element des Tupels der gewünschte Schlüssel ist. Da Sie sich nicht um die Aufgabe kümmern, können Sie sie tatsächlich vollständig löschen.

input_RDD.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y) 

Wenn später Sie wollen einfach die Taste verschieben, würden Sie eine kompliziertere Karte tun:

input_RDD.map(lambda x: (x[1][0],(x[0],x[1][1])) 
+0

Könnten Sie das erste Lambda in der Map-Funktion ein bisschen mehr erklären? Warum x [1]? das Tupel wäre Aufgabe, Matthias

+1

@Matthias Angenommen x = (u'Task1 ', (u'James', 10)), x [0] wird u'Task1 'und x [1] wird (u'James ', 10) mit der [normalen Tupel-Funktionalität] (https://docs.python.org/2/tutorial/datastructures.html#tuples-and-sequences). Da wir die Stunden nach dem Namen berechnen wollen, sind das die zwei Informationen, die wir brauchen. (Weitere Kommentare kommen in anderen Teilen dieses Problems.) –

+0

Der Weg über 'map' nachzudenken ist, dass RDDs mit einer beliebigen Funktion, die Sie schreiben, transformiert wird. Wenn Sie mit einer RDD beginnen, deren Objekte den Typ X haben und Sie den Typ Y haben möchten, schreiben Sie eine Map, die ein X in ein Y umwandelt und Sie dann Zeile für Zeile parallel anwenden. Dieses Problem benötigt nur eine einfache Transformation - werfen Sie die Aufgabe weg. Nehmen wir an, wir wollten stattdessen nach Aufgaben summieren, und wir haben diese Namen im Weg. Dann machen wir folgendes: 'input_RDD.map (Lambda x: (x [0], x [1] [1]). ReduceByKey (Lambda x, y: x + y)' –

0
def extract_time_tracking(time_bookings): 
val splits = rec.split(",") 
val person = splits(1).replaceAll(" \\(u'", "").replaceAll("'", "") 
val total_hours = splits(2).replaceAll("\\)", "").trim().toInt 
return (person, total_hours) 


input_RDD.map(extract_time_tracking).reduceByKey 

Ich benutze scala, also überprüfen Sie bitte die Syntax.