2017-04-25 5 views
1

ich mit mir einen dict Typ RDD haben:PySpark: Iterations über dict Typ RDD

>>> a.collect() 

[{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]

einfach zu überprüfen:

>>> a.map(lambda x:type(x)).collect() 

[< type 'dict' >]

Allerdings bin ich nicht in der Lage zu iterieren Überdi dict RDD mit map(). Ich habe versucht:

>>> a.map(lambda x:(k,v) for k,v in x.iteritems()) 

Zu meiner völligen Überraschung es zum Fehler:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
NameError: name 'x' is not defined 

Bin ich hier jeden wichtigen Punkt fehlt.

EDIT: Code ist alles in Ordnung kleine Fehler zu Generator Syntax sollte korrekter Code im Zusammenhang Sperre sein:

a.map(lambda x:[(k,v) for k,v in x.iteritems()]) 

Antwort

1

ich das versucht:

data = [{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, 
     (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}] 

rdd = sc.parallelize(data) 
rdd.flatMap(lambda _: [(k,v) for (k,v) in _.items()]).collect() 

und erhielt diesen:

[((1155718, 105), 14), 
((1155718, 738), 4), 
((1155718, 2904), 38), 
((1155718, 1887), 2), 
((1155718, 1196), 6), 
((1155718, 1930), 12), 
((1155718, 927), 6), 
((1155718, 2783), 8), 
((1155718, 997), 4), 
((1155718, 952), 4)] 
+0

Du hast recht, eigentlich sollte mein Code 'a.map sein (Lambda x: [(k, v) für k, v in x.iteritems()])' – abhiieor

+0

Nur um hier zu notieren: für Python 3 mit 'items()'. Für Python 2 mit 'iteritems()' – titipata