2015-04-18 7 views
37

Ich versuche, meine groupByKey ist, um herauszufinden, warum der Rückkehr folgende:PySpark groupByKey Rückkehr pyspark.resultiterable.ResultIterable

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)] 

Ich habe Werte flatMapped, die wie folgt aussehen:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')] 

I‘ m tun nur eine einfache:

Antwort

53

Was Sie zurück bekommen, ist ein Objekt, das Sie iterieren können r die Ergebnisse. Sie können die Ergebnisse von groupByKey in eine Liste umwandeln, indem Sie list() für die Werte aufrufen, z.

example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')]) 

example.groupByKey().collect() 
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......] 

example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect() 
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])] 
+23

'example.groupByKey(). MapValues ​​(list) .collect()' ist kürzer und funktioniert auch –

+0

Wie kann ich den 'ResultIterable' Typ abbilden? – xxx222

15

Sie auch

example.groupByKey().mapValues(list) 
1

Anstelle der Verwendung von groupByKey() verwenden können, würde ich Ihnen vorschlagen cogroup() verwenden. Sie können das folgende Beispiel beziehen.

[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] 

Beispiel:

>>> x = sc.parallelize([("foo", 1), ("bar", 4)]) 
>>> y = sc.parallelize([("foo", -1)]) 
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] 
>>> print(z) 

Sie sollten die gewünschte Ausgabe erhalten ...

1

Beispiel:

r1 = sc.parallelize([('a',1),('b',2)]) 
r2 = sc.parallelize([('b',1),('d',2)]) 
r1.cogroup(r2).mapValues(lambdax:tuple(reduce(add,__builtin__.map(list,x)))) 

Ergebnis:

[('d', (2,)), ('b', (2, 1)), ('a', (1,))]