ich eine große csv mit einigen Social-Media-Daten:Halte ursprüngliche Struktur von Datenrahmen nach groupBy in PySpark
message_id, user_id, message, date
"1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013"
"2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013"
"3", "123", "i want this message removed", "Sun June 12 15:08:58 +0000 2013"
"4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013"
und mag Nachrichten auf einigen Kriterien innerhalb einer Gruppe entfernen Basis (für dieses Beispiel kann die Gruppe sein user_id
Dies ist, was ich getan habe. erstellt eine Standardfunktion für meine Ausschlusskriterien definiert eine udf
basierend auf dieser Methode und dann die Funktion auf die gruppierten Daten gelten:
def exclusion_criteria(data_list):
keep = []
for d in data_list:
if some_condition:
keep.append(d)
return keep
myUdf = udf(exclusion_criteria, ArrayType(StringType()))
msgsDF = session.read.csv("data.csv", header=False)
filterMsgsDF = msgsDF.groupBy("user_id").agg(collect_list("message")
.alias("message")).withColumn("message",myUdf("message"))
Am Ende bekomme ich etwas, das wie folgt aussieht:
filterMsgsDF.take(1)
[Row(user_id='123', _c2=['some message blah blah', 'another message blah'])]
aber die Probleme sind, dass ich die Informationen mit jeder Nachricht zugeordnet bin Fallenlassen (message_id
und date
). Was ich am Ende wollen, ist so etwas wie
["1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013"]
["2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013"]
["4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013"]
Gibt es eine Möglichkeit, diese andere Information zu verbinden oder sie während des Schritts groupBy/agg zu halten? Vielleicht groupBy
ist nicht der beste Weg, dies zu tun?
einfach die andere Informationen über Nachricht in derselben Liste anhängen. Ie Nachricht ID, Nachricht, Datum wird einzelne Liste sein. – StackPointer
Sorry, ich folge nicht. Wo hänge ich die anderen Informationen an? – Sal
[Zeile (user_id = '123', _c2 = [[123, 'einige Nachricht blah blah', Datum], [124, 'eine andere Nachricht blah', Datum]]]]] – StackPointer