2016-10-31 4 views
1

Ich versuche, eine Kohortenstudie zu erstellen, um In-App-Benutzerverhalten zu verfolgen, und ich möchte fragen, wenn Sie eine Idee haben, wie ich Bedingungen in Pyspark angeben kann, wenn ich .join verwende () Gegeben:Innere Verbindung mit Pyspark für eine Kohortenstudie

rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'service1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-19'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'1', 
     u'67.0', 
     u'2016-293', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-19')))]) 


rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'serice1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-20'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'10', 
     u'3346.0', 
     u'2016-294', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-20')))]) 

Diese beiden RDDs Informationen über einen Benutzer, mit ‚6df99638e4584a618f92a9cfdf318cf8‘ als ID darstellen, und die auf Service 1 und service2 auf 2016.10.19 und 2016.10.20 angemeldet haben. Mein Ziel ist es, meine zwei Rds mit jeweils mindestens 20 000 Zeilen zu verbinden. Also muss es eine innere Verbindung sein. Das eigentliche Ziel ist es, alle Benutzer, die sich bereits am 19.10.2016 eingeloggt haben, zu erreichen und am 20.10.2016 ebenfalls eingeloggt zu sein. Genauer gesagt ist mein letztes Objekt, als Ergebnis, hier für rxemple, nach einem inneren Join, nur den Inhalt von rdd2 zu haben.

erwartete Ausgabe:

[(u'6df99638e4584a618f92a9cfdf318cf8', 
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')) 
) ] 

Eine einfache verbinden rdd1.join (RDD2) mir geben, logisch, ein RDD alle Paare von Elementen mit passenden, die beiden RDDs enthält. Ein leftOuterJoin oder ein rightOuterJoin passen auch nicht zu meinem Land, da ich einen inneren Join möchte (nur IDs, die bereits in rdd1 und rdd2 vorhanden sind).

Erwartete Ausgabe: Angenommen wir haben zwei Dicts: dict1 = {'a ':' Mann ',' b ': Frau,' c ':' Baby '} und dict2 = {' a ':' Zara ',' x ': Mango,' y ':' Celio '}. Die erwartete Ausgabe muss lauten: output_dict = {'a': 'Zara'}. 'a' (der Schlüssel) ist bereits in dict 1 vorhanden, und was ich will ist der Schlüssel, Wert von dict2!

Es versucht, dies zu tun:

rdd1.map(lambda (k, v) : k).join(rdd2) 

Dieser Code mir ein leeres rdd gibt.

enter image description here

Was tun? PS: Ich muss mit rdds, nicht mit Datenrahmen umgehen! Also ich möchte meine rdds nicht in DataFrames konvertieren: D Jede Hilfe wird geschätzt. Danke !

+0

was ist die erwartete Ausgabe? – Yaron

+0

@Yaron: [(u'6df99638e4584a618f92a9cfdf318cf8' , ((u'serice1' , u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A‘, u'2016-02-08' , u'2016-39' , u'2016-6' , u'2016-2' '2016.10.20'), (u'service2' , u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A‘, u '10', u'3346.0' , u'2016-294' , u'2016-42' , u'2016-10' '2016.10.20'))) ] – DataAddicted

+0

@Yaron : Der Inhalt von rdd2. Ich suche nach Benutzern, die in rdd1 (2016-10-19) und rdd2 (2016-10-20) vorhanden sind. – DataAddicted

Antwort

2

So suchen Sie einen Join von RDD1 und RDD2, die nur Schlüssel und Wert von RDD2 nehmen:

rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2)) 

Das Ergebnis ist:

print rdd_output.take(1) 

[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20') 
))] 
+0

Es klingt gut! Danke @Yaron! – DataAddicted

Verwandte Themen