2016-04-19 9 views
2

Ich bekomme diesen Fehler, aber ich weiß nicht warum. Grundsätzlich ich von diesem Code bin erroring:pyspark: 'PipelinedRDD' Objekt ist nicht iterierbar

a = data.mapPartitions(helper(locations)) 

wo Daten ein RDD und mein Helfer ist definiert als:

def helper(iterator, locations): 
     for x in iterator: 
      c = locations[x] 
      yield c 

(Standorte ist nur ein Array von Datenpunkten) Ich sehe nicht, Was das Problem ist, aber ich bin auch nicht der Beste in pyspark, also kann mir bitte jemand sagen, warum ich bekomme "PipelinedRDD" Objekt ist nicht iterierbar von diesem Code?

+0

Sie können nicht auf einem rdd in der Art und Weise wiederholen Sie tun. Bitte schauen Sie sich http://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scala – Mohan

+0

@Mohan: danke Ich denke, ich bekomme die Idee jetzt, aber ich bin immer noch den gleichen Fehler bekommen. Ich rufe dies jetzt an: a = data.mapPartitions (Lambda-Iterator: Helfer (Iterator, Orte)). Was mache ich sonst falsch? – deeformvp

Antwort

1

RDD kann mithilfe von Map- und Lambda-Funktionen iteriert werden. Ich habe durch Pipelined RDD iteriert das unten stehende Methode

lines1 = sc.textFile("\..\file1.csv") 
lines2 = sc.textFile("\..\file2.csv") 

pairs1 = lines1.map(lambda s: (int(s), 'file1')) 
pairs2 = lines2.map(lambda s: (int(s), 'file2')) 

pair_result = pairs1.union(pairs2) 

pair_result.reduceByKey(lambda a, b: a + ','+ b) 

result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(','))) 
result_ll = [list(elem) for elem in result] 

---> result_ll = [list (Elem) für Elem in Folge] Typeerror: 'PipelinedRDD' Objekt nicht Iterable ist

Statt dessen ich ersetzte die Iteration Kartenfunktion

result_ll = result.map(lambda elem: list(elem)) 

Hoffnung mit diesem Code zu modifizieren hilft entsprechend

Verwandte Themen