2016-12-05 5 views
4

Ich versuche, Wortanzahl von einem CSV zu erhalten, wenn in einer anderen Spalte gruppiert. Mein csv hat drei Spalten: id, message und user_id. Ich dieses in lesen und teilen dann die Nachricht und speichern Sie eine Liste von Unigramme:Anwenden einer Funktion auf groupBy Daten mit pyspark

+-----------------+--------------------+--------------------+ 
|    id|    message|    user_id| 
+-----------------+--------------------+--------------------+ 
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...| 
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...| 
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...| 
+-----------------+--------------------+--------------------+ 

Als nächstes angesichts meiner Datenrahmen df, möchte ich zu einer Gruppe von user_id und dann zählt für jeden der Unigramme bekommen. Als einfaches ersten Durchgang habe ich versucht, durch user_id und bekommen die Länge des gruppierten Nachrichtenfeld Gruppierung:

from collections import Counter 
from pyspark.sql.types import ArrayType, StringType, IntegerType 
from pyspark.sql.functions import udf 

df = self.session.read.csv(self.corptable, header=True, 
     mode="DROPMALFORMED",) 

# split my messages .... 
# message is now ArrayType(StringType()) 

grouped = df.groupBy(df["user_id"]) 
counter = udf(lambda l: len(l), ArrayType(StringType())) 
grouped.agg(counter(df["message"])) 
print(grouped.collect()) 

ich die folgende Fehlermeldung erhalten:

pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;" 

Nicht sicher, wie um diesen Fehler zu bekommen. Wie wendet man im Allgemeinen eine Funktion auf eine Spalte an, wenn man eine andere gruppiert? Muss ich immer eine benutzerdefinierte Funktion erstellen? Sehr neu in Spark.

bearbeiten: Hier ist, wie ich löste dies, da ein tokenizer in einer separaten Python-Datei:

group_field = "user_id" 
message_field = "message" 

context = SparkContext() 
session = SparkSession\ 
     .builder\ 
     .appName("dlastk")\ 
     .getOrCreate() 

# add tokenizer 
context.addPyFile(tokenizer_path) 
from tokenizer import Tokenizer 
tokenizer = Tokenizer() 
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType())) 

df = session.read.csv("myFile.csv", header=True,) 
df = df[group_field, message_field] 

# tokenize the message field 
df = df.withColumn(message_field, spark_tokenizer(df[message_field])) 

# create ngrams from tokenized messages 
n = 1 
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add) 

# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency 
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()]) 

# rdd -> DF 
flat = flat.toDF() 
flat.write.csv("myNewCSV.csv") 

Daten wie folgt aussieht:

# after read 
+--------------------+--------------------+ 
|    user_id|    message| 
+--------------------+--------------------+ 
|00035fb0dcfbeaa8b...|To the douchebag ...| 
|00035fb0dcfbeaa8b...| T minus 1 week...| 
|00035fb0dcfbeaa8b...|Last full day of ...| 
+--------------------+--------------------+ 

# after tokenize 
+--------------------+--------------------+ 
|    user_id|    message| 
+--------------------+--------------------+ 
|00035fb0dcfbeaa8b...|[to, the, doucheb...| 
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...| 
|00035fb0dcfbeaa8b...|[last, full, day,...| 
+--------------------+--------------------+ 

# grouped: after 1grams extracted and Counters added 
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ... 

# flat: after calculating sum and relative frequency for each 1gram 
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] .... 

# after flat RDD to DF 
+--------------------+---------+---+--------------------+ 
|     _1|  _2| _3|     _4| 
+--------------------+---------+---+--------------------+ 
|00035fb0dcfbeaa8b...|  face| 3| 3.20547066994337E-4| 
|00035fb0dcfbeaa8b...|  was| 26|0.002778074580617587| 
|00035fb0dcfbeaa8b...|  how| 22|0.002350678491291...| 
+--------------------+---------+---+--------------------+ 
+0

Diese Antwort sagt: "Sie müssen eine Variante von agg .." anwenden, was ich tue. – Sal

Antwort

6

Eine natürliche Annäherung könnte sein, die Wörter in eine Liste zu gruppieren, und die Verwenden Sie die Python-Funktion Counter(), um Wortzählungen zu generieren. Für beide Schritte verwenden wir udf 's. Erstens ist die eine, die die verschachtelte Liste von collect_list() mehrerer Arrays resultierende abflachen wird:

unpack_udf = udf(
    lambda l: [item for sublist in l for item in sublist] 
) 

Zweitens, eine, die die Wortanzahl Tupeln, oder in unserem Fall struct ‚s erzeugt:

from pyspark.sql.types import * 
from collections import Counter 

# We need to specify the schema of the return object 
schema_count = ArrayType(StructType([ 
    StructField("word", StringType(), False), 
    StructField("count", IntegerType(), False) 
])) 

count_udf = udf(
    lambda s: Counter(s).most_common(), 
    schema_count 
) 

Dass sie alle zusammen:

from pyspark.sql.functions import collect_list 

(df.groupBy("id") 
.agg(collect_list("message").alias("message")) 
.withColumn("message", unpack_udf("message")) 
.withColumn("message", count_udf("message"))).show(truncate = False) 
+-----------------+------------------------------------------------------+ 
|id    |message            | 
+-----------------+------------------------------------------------------+ 
|10100718890699676|[[oecd,1], [the,1], [with,1], [at,1]]     | 
|10100720363468236|[[what,3], [me,1], [sad,1], [to,1], [does,1], [the,1]]| 
+-----------------+------------------------------------------------------+ 

Daten:

df = sc.parallelize([(10100720363468236,["what", "sad", "to", "me"]), 
        (10100720363468236,["what", "what", "does", "the"]), 
        (10100718890699676,["at", "the", "oecd", "with"])]).toDF(["id", "message"]) 
+0

Schön, scheint ähnlich zu dem, was ich letzte Nacht kam. Ich frage mich, welcher ist effizienter? – Sal

+0

können Sie das 'time' Paket versuchen, um die Ausführungszeit zu messen. am Anfang des Codeblocks: 'start_time = time.time()', am Ende: 'print (" Ausführungszeit ---% s Sekunden --- "% (time.time() - start_time))' – mtoto

1

Versuchen:

from pyspark.sql.functions import * 

df.withColumn("word", explode("message")) \ 
    .groupBy("user_id", "word").count() \ 
    .groupBy("user_id") \ 
    .agg(collect_list(struct("word", "count"))) 
Verwandte Themen