Ich habe zwei Dateien in einem Funken-Cluster, foo.csv
und bar.csv
, beide mit 4 Spalten und die gleichen genauen Felder: time, user, url, category
.pyspark: Eine RDD basierend auf bestimmten Spalten einer anderen RDD filtern
Ich möchte die foo.csv
, durch bestimmte Spalten von bar.csv
herausfiltern. Am Ende möchte ich Schlüssel/Wert-Paare von (Benutzer, Kategorie): [Liste, von, URLs]. Zum Beispiel:
foo.csv:
11:50:00, 111, www.google.com, search
11:50:00, 222, www.espn.com, news
11:50:00, 333, www.reddit.com, news
11:50:00, 444, www.amazon.com, store
11:50:00, 111, www.bing.com, search
11:50:00, 222, www.cnn.com, news
11:50:00, 333, www.aol.com, news
11:50:00, 444, www.jet.com, store
11:50:00, 111, www.yahoo.com, search
11:50:00, 222, www.bbc.com, news
11:50:00, 333, www.nytimes.com, news
11:50:00, 444, www.macys.com, store
bar.csv:
11:50:00, 222, www.bbc.com, news
11:50:00, 444, www.yahoo.com, store
Ergebnis soll in:
{
(111, search):[www.google.com, www.bing.com, www.yahoo.com],
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com]
}
Mit anderen Worten, wenn ein (Benutzer, Kategorie) Paar in bar.csv
existiert, würde ich alle Linien in foo.csv
heraus filtern mag wenn sie das gleiche Paar (Benutzer, Kategorie) haben. Daher möchte ich im obigen Beispiel alle Zeilen in foo.csv
mit (222, news)
und (444, store)
entfernen. Letztendlich möchte ich, nachdem ich die gewünschten Zeilen entfernt habe, ein Wörterbuch mit Schlüssel/Wert-Paaren wie: (user, category): [list, of, urls]
.
Hier ist mein Code:
fooRdd = sc.textFile("file:///foo.txt/")
barRdd = sc.textFile("file:///bar.txt/")
parseFooRdd= fooRdd.map(lambda line: line.split(", "))
parseBarRdd = barRdd.map(lambda line: line.split(", "))
# (n[1] = user_id, n[3] = category_id) --> [n[2] = url]
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
Der obige Code funktioniert und bekommt die Datensätze im Format Ich möchte:
(user_id, category): [all, urls, visited, by, user, in, that, category]
jedoch paar Fragen: 1) Ich denke, es gibt eine Liste von Wörterbücher mit nur einem k/v-Paar und 2) Ich bin fest, was als nächstes zu tun ist. Ich weiß, was in Englisch zu tun ist: Holen Sie sich die Schlüssel in barGroupRdd
(Tupel), und entfernen Sie alle Zeilen in fooGroupRdd, die den gleichen Schlüssel haben. Aber ich bin neu in Pyspark und ich habe das Gefühl, dass es Befehle gibt, die ich nicht ausnutze. Ich denke, dass mein Code optimiert werden kann. Zum Beispiel glaube ich nicht, dass ich diese barGroupRdd
Zeile erstellen müsste, weil alles, was ich brauche, von bar.csv
ist (user_id, Kategorie) - ich brauche kein Wörterbuch zu erstellen. Ich denke auch, ich sollte zuerst herausfiltern, und dann das Wörterbuch aus dem Ergebnis erstellen. Jede Hilfe oder Beratung wird geschätzt, danke!
Danke, das ist es! Fühlt sich an, als ob es so viele Funkenfunktionen in der API gibt, die neu für mich sind, ich hoffe, dass dies die effizienteste Lösung ist. Dieser erste Ersatz hat mir sehr geholfen. –