2017-01-31 4 views
3

Ich habe zwei Dateien in einem Funken-Cluster, foo.csv und bar.csv, beide mit 4 Spalten und die gleichen genauen Felder: time, user, url, category.pyspark: Eine RDD basierend auf bestimmten Spalten einer anderen RDD filtern

Ich möchte die foo.csv, durch bestimmte Spalten von bar.csv herausfiltern. Am Ende möchte ich Schlüssel/Wert-Paare von (Benutzer, Kategorie): [Liste, von, URLs]. Zum Beispiel:

foo.csv: 
11:50:00, 111, www.google.com, search 
11:50:00, 222, www.espn.com, news 
11:50:00, 333, www.reddit.com, news 
11:50:00, 444, www.amazon.com, store 
11:50:00, 111, www.bing.com, search 
11:50:00, 222, www.cnn.com, news 
11:50:00, 333, www.aol.com, news 
11:50:00, 444, www.jet.com, store 
11:50:00, 111, www.yahoo.com, search 
11:50:00, 222, www.bbc.com, news 
11:50:00, 333, www.nytimes.com, news 
11:50:00, 444, www.macys.com, store 

bar.csv: 
11:50:00, 222, www.bbc.com, news 
11:50:00, 444, www.yahoo.com, store 

Ergebnis soll in:

{ 
(111, search):[www.google.com, www.bing.com, www.yahoo.com], 
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com] 
} 

Mit anderen Worten, wenn ein (Benutzer, Kategorie) Paar in bar.csv existiert, würde ich alle Linien in foo.csv heraus filtern mag wenn sie das gleiche Paar (Benutzer, Kategorie) haben. Daher möchte ich im obigen Beispiel alle Zeilen in foo.csv mit (222, news) und (444, store) entfernen. Letztendlich möchte ich, nachdem ich die gewünschten Zeilen entfernt habe, ein Wörterbuch mit Schlüssel/Wert-Paaren wie: (user, category): [list, of, urls].

Hier ist mein Code:

fooRdd = sc.textFile("file:///foo.txt/") 
barRdd = sc.textFile("file:///bar.txt/") 


parseFooRdd= fooRdd.map(lambda line: line.split(", ")) 
parseBarRdd = barRdd.map(lambda line: line.split(", ")) 



# (n[1] = user_id, n[3] = category_id) --> [n[2] = url] 
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 

Der obige Code funktioniert und bekommt die Datensätze im Format Ich möchte:

(user_id, category): [all, urls, visited, by, user, in, that, category] 

jedoch paar Fragen: 1) Ich denke, es gibt eine Liste von Wörterbücher mit nur einem k/v-Paar und 2) Ich bin fest, was als nächstes zu tun ist. Ich weiß, was in Englisch zu tun ist: Holen Sie sich die Schlüssel in barGroupRdd (Tupel), und entfernen Sie alle Zeilen in fooGroupRdd, die den gleichen Schlüssel haben. Aber ich bin neu in Pyspark und ich habe das Gefühl, dass es Befehle gibt, die ich nicht ausnutze. Ich denke, dass mein Code optimiert werden kann. Zum Beispiel glaube ich nicht, dass ich diese barGroupRdd Zeile erstellen müsste, weil alles, was ich brauche, von bar.csv ist (user_id, Kategorie) - ich brauche kein Wörterbuch zu erstellen. Ich denke auch, ich sollte zuerst herausfiltern, und dann das Wörterbuch aus dem Ergebnis erstellen. Jede Hilfe oder Beratung wird geschätzt, danke!

Antwort

2

Sie sind wirklich ganz in der Nähe.

Statt dem für jeden RDD:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\ 
    n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 

tun:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\ 
    n[2])).groupByKey().map(lambda x: [(x[0]), list(x[1])]) 

Auf diese Weise können Sie Zugriff auf eigentlich die Tasten mit dem rdd.keys() -Methode und erstellen Sie eine Liste bar_keys.

bar_keys = barGroupRdd.keys().collect() 

Dann können Sie genau das tun, was Sie gesagt haben. Filtern Sie die Zeilen in fooGroupRdd mit einem Schlüssel in bar_keys.

dict(fooGroupRdd.filter(lambda x: x[0] not in bar_keys)\ 
    .map(lambda x: [x[0], x[1]]).collect()) 

Das Endergebnis sieht wie folgt aus:

{('111', 'search'): ['www.google.com', 'www.bing.com', 'www.yahoo.com'], 
('333', 'news'): ['www.reddit.com', 'www.aol.com', 'www.nytimes.com']} 

Hoffnung, das hilft.

Pro Kommentar, fragte ich mich auch, ob dies die effizienteste Methode ist.Wenn Sie sich die Klassenmethoden für RDD ansehen, finden Sie collectAsMap(), die wie Collect funktioniert, aber statt einer Liste ein Wörterbuch zurückgibt. Nach der Untersuchung des Quellcodes macht die Methode jedoch genau das, was ich getan habe, also scheint es die beste Option zu sein.

+0

Danke, das ist es! Fühlt sich an, als ob es so viele Funkenfunktionen in der API gibt, die neu für mich sind, ich hoffe, dass dies die effizienteste Lösung ist. Dieser erste Ersatz hat mir sehr geholfen. –

Verwandte Themen