2017-10-14 4 views
0

Ich versuche, eine Verknüpfung zwischen zwei RDDs mit der ersten Spalte als Schlüssel durchzuführen. Die RDDs sehen so aus:Funke reducebykey und ignorieren Rest

RDD1: 
(k1,(s11,s12,s13)) 
(k2,(s21,s22,s23)) 
(k3,(s31,s32,s33)) 
... 

RDD2: 
(k1,(t11,t12,t13)) 
(k2,(t21,t22,t23)) 
(k4,(t41,t42,t43)) 
... 

Ki von einer RDD kann oder kann nicht eine Übereinstimmung von der anderen finden. Wenn es jedoch eine Übereinstimmung findet, wird es mit nur einer Zeile der anderen RDD übereinstimmen. Mit anderen Worten, ki sind Primärschlüssel für beide RDDs. von

RDD3=RDD1.union(RDD2).reduceByKey(lambda x,y:(x+y)).filter(lambda x:len(x[1])==6) 

ich das tue Die resultierende RDD würde wie folgt aussehen:

RDD3: 
(k1,(s11,s12,s13,t11,t12,t13)) 
(k2,(s21,s22,s23,t21,t22,t23)) 
... 

I filter-Funktion während der Berechnung RDD3 vermeiden wollen. Es sieht wie eine vermeidbare Berechnung aus. Ist es möglich, dies mit eingebauten Funkenfunktionen zu tun? Ich will nicht funken SQL verwenden oder Datenrahmen

Antwort

1

Sie benötigen die join Methode, gefolgt von einer mapValues Methode Werte aus dem gleichen Schlüssel zu verketten:

rdd1.join(rdd2).mapValues(lambda x: x[0] + x[1]).collect() 
# [('k2', ('s21', 's22', 's23', 't21', 't22', 't23')), 
# ('k1', ('s11', 's12', 's13', 't11', 't12', 't13'))] 
Verwandte Themen