2016-08-19 34 views
2

Ich habe ein RDD wie folgt aus:Wie gruppieren und summieren sich in Spark?

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" } 

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" } 

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" } 

Mein Ziel von Schlüssel1 zu erste Gruppe ist und dann die Gruppe von key2 und schließlich key3 hinzufügen.

Ich erwarte Endergebnisses wie

key1   key2  key3 
"fruit"  , "US" , 3 
"vegetable" , "US" , 1 
"fruit"  , "Japan" , 3 
"vegetable" , "Japan" , 3 

wie unten Mein Code,

beginnt
rdd_arm = rdd_arm.map(lambda x: x[1]) 

rdd_arm beinhaltet die oben genannten Schlüssel: Wert-Format.

Ich bin mir nicht sicher, wohin ich als nächstes gehen soll. Kann mir jemand helfen?

Antwort

1

Lassen Sie uns Ihre RDD erstellen:

In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }]) 
In [2]: rdd_arm.collect() 
Out[2]: 
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'US', 'key3': '2'}, 
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'}, 
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}] 

Zuerst Sie einen neuen Schlüssel erstellen haben, die das Paar von key1 und key2 sein wird. Der Wert wird key3, so dass Sie so etwas wie dies tun wollen:

In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])) 

In [4]: new_rdd.collect() 
Out[4]: 
[('fruit, US', '1'), 
('fruit, US', '2'), 
('vegetable, US', '1'), 
('fruit, Japan', '3'), 
('vegetable, Japan', '3')] 

Dann wollen wir die Werte der Schlüssel addieren, die Duplikate sind, werden einfach reduceByKey() Aufruf wie folgt aus:

In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b)) 

In [6]: new_rdd.collect() 
Out[6]: 
[('fruit, US', 3), 
('fruit, Japan', '3'), 
('vegetable, US', '1'), 
('vegetable, Japan', '3')] 

und wir sind fertig!


Natürlich könnte dies Einzeiler, wie dies:

new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b)) 
+1

Hallo gsamaras. Danke für das Follow-up. –

2

Ich löste es selbst.

Ich musste einen Schlüssel einschließlich mehrerer Schlüssel erstellen und dann addieren.

rdd_arm.map(lambda x : x[0] + ", " + x[1] , x[2]).reduceByKey(lambda a,b : a + b) 

Die folgende Frage war nützlich.

How to group by multiple keys in spark?

+0

mir sagen Lassen Sie zu, dass dies für mich nicht funktioniert hat, war ich Fehler undefinierten Namen bekommen, und nach dem Aufstehen ich konnte sie nicht erreichen. Als Ergebnis habe ich eine neue Antwort gepostet, hoffe es gefällt euch! Ich habe die Frage jedoch aufgefrischt, da ich dadurch üben konnte! Vielen Dank! – gsamaras

Verwandte Themen