Ich versuche, Wortanzahl von einem CSV zu erhalten, wenn in einer anderen Spalte gruppiert. Mein csv hat drei Spalten: id, message und user_id. Ich dieses in lesen und teilen dann die Nachricht und speichern Sie eine Liste von Unigramme:Anwenden einer Funktion auf groupBy Daten mit pyspark
+-----------------+--------------------+--------------------+
| id| message| user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+
Als nächstes angesichts meiner Datenrahmen df
, möchte ich zu einer Gruppe von user_id
und dann zählt für jeden der Unigramme bekommen. Als einfaches ersten Durchgang habe ich versucht, durch user_id
und bekommen die Länge des gruppierten Nachrichtenfeld Gruppierung:
from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf
df = self.session.read.csv(self.corptable, header=True,
mode="DROPMALFORMED",)
# split my messages ....
# message is now ArrayType(StringType())
grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())
ich die folgende Fehlermeldung erhalten:
pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"
Nicht sicher, wie um diesen Fehler zu bekommen. Wie wendet man im Allgemeinen eine Funktion auf eine Spalte an, wenn man eine andere gruppiert? Muss ich immer eine benutzerdefinierte Funktion erstellen? Sehr neu in Spark.
bearbeiten: Hier ist, wie ich löste dies, da ein tokenizer in einer separaten Python-Datei:
group_field = "user_id"
message_field = "message"
context = SparkContext()
session = SparkSession\
.builder\
.appName("dlastk")\
.getOrCreate()
# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))
df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]
# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))
# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)
# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])
# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")
Daten wie folgt aussieht:
# after read
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...| T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+
# after tokenize
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+
# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...
# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....
# after flat RDD to DF
+--------------------+---------+---+--------------------+
| _1| _2| _3| _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...| face| 3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...| was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...| how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+
Diese Antwort sagt: "Sie müssen eine Variante von agg .." anwenden, was ich tue. – Sal