2016-06-03 2 views
0

Ich versuche, eine Liste von RDDs an GroupWith übergeben, anstatt sie manuell per Index anzugeben.Wie führe ich die Liste der RDDs an groupWith in Pyspark

Hier ist die Beispieldaten

w = sc.parallelize([("1", 5), ("3", 6)]) 
x = sc.parallelize([("1", 1), ("3", 4)]) 
y = sc.parallelize([("2", 2), ("4", 3)]) 
z = sc.parallelize([("2", 42), ("4", 43), ("5", 12)]) 

Jetzt habe ich ein Array wie folgt erstellt.

m = [w,x,y,z] 

Die manuelle hartcodierte Weise ist

[(x, tuple(map(list, y))) for x, y in sorted(list(m[0].groupWith(m[1],m[2],m[3]).collect()))] 

die

unten Ergebnis druckt
[('1', ([5], [1], [], [])), 
('2', ([], [], [2], [42])), 
('3', ([6], [4], [], ])), 
('4', ([], [], [3], [43])), 
('5', ([], [], [], [12]))] 

Aber ich möchte so etwas wie Pass manuell vorbei m[1:] stattdessen tun.

[(x, tuple(map(list, y))) for x, y in sorted(list(m[0].groupWith(m[1:]).collect()))] 

Ich habe versucht, Klammern zu entfernen, aber es muss Zeichenfolge konvertiert werden und ich bekomme unter Fehler

AttributeError: 'list' object has no attribute 'mapValues' 

    AttributeError: 'str' object has no attribute 'mapValues' 
+0

Versuchen Sie, 'm [1:]' auszuführen. Es gibt parallelCollectionRDD [27] bei parallelize bei PythonRDD.scala aus: 423 ', nicht der tatsächliche Name der RDDs. Nicht sicher, wie Sie Ihr Problem lösen, aber es zumindest erklärt mit diesem Code funktioniert nicht – David

Antwort

0

Da groupWith akzeptiert varargs alles, was Sie tun müssen, ist, Argumente entpacken:

w.groupWith(*m[1:]) 
+1

Großartig. Das hat den Trick gemacht. Danke! Kann nicht ohne genügend Ansehen upvote sein. – veeragoni

Verwandte Themen