2017-07-07 6 views
0

ich eine große csv mit einigen Social-Media-Daten:Halte ursprüngliche Struktur von Datenrahmen nach groupBy in PySpark

message_id, user_id, message, date 
"1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013" 
"2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013" 
"3", "123", "i want this message removed", "Sun June 12 15:08:58 +0000 2013" 
"4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013" 

und mag Nachrichten auf einigen Kriterien innerhalb einer Gruppe entfernen Basis (für dieses Beispiel kann die Gruppe sein user_id

Dies ist, was ich getan habe. erstellt eine Standardfunktion für meine Ausschlusskriterien definiert eine udf basierend auf dieser Methode und dann die Funktion auf die gruppierten Daten gelten:

def exclusion_criteria(data_list): 
    keep = [] 
    for d in data_list: 
     if some_condition: 
      keep.append(d) 
    return keep 

myUdf = udf(exclusion_criteria, ArrayType(StringType())) 

msgsDF = session.read.csv("data.csv", header=False) 
filterMsgsDF = msgsDF.groupBy("user_id").agg(collect_list("message") 
    .alias("message")).withColumn("message",myUdf("message")) 

Am Ende bekomme ich etwas, das wie folgt aussieht:

filterMsgsDF.take(1) 
[Row(user_id='123', _c2=['some message blah blah', 'another message blah'])] 

aber die Probleme sind, dass ich die Informationen mit jeder Nachricht zugeordnet bin Fallenlassen (message_id und date). Was ich am Ende wollen, ist so etwas wie

["1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013"] 
["2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013"] 
["4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013"] 

Gibt es eine Möglichkeit, diese andere Information zu verbinden oder sie während des Schritts groupBy/agg zu halten? Vielleicht groupBy ist nicht der beste Weg, dies zu tun?

+0

einfach die andere Informationen über Nachricht in derselben Liste anhängen. Ie Nachricht ID, Nachricht, Datum wird einzelne Liste sein. – StackPointer

+0

Sorry, ich folge nicht. Wo hänge ich die anderen Informationen an? – Sal

+0

[Zeile (user_id = '123', _c2 = [[123, 'einige Nachricht blah blah', Datum], [124, 'eine andere Nachricht blah', Datum]]]]] – StackPointer

Antwort

0

Etwas wie:

filterMsgsDF = msgsDF.withColumn('message_list', collect_list(msgsDF['message']).over(Window.partitionBy('user_id'))) 

Ausgang:

+----------+-------+---------------------------+-------------------------------+---------------------------------------------------------------------------+ 
|message_id|user_id|message     |date       |message_list                | 
+----------+-------+---------------------------+-------------------------------+---------------------------------------------------------------------------+ 
|1   |123 |some message blah blah  |Sun May 12 15:08:58 +0000 2013 |[some message blah blah, another message blah, i want this message removed]| 
|2   |123 |another message blah  |Sun June 12 15:08:58 +0000 2013|[some message blah blah, another message blah, i want this message removed]| 
|3   |123 |i want this message removed|Sun June 12 15:08:58 +0000 2013|[some message blah blah, another message blah, i want this message removed]| 
|4   |321 |more blah     |Mon June 12 15:08:58 +0000 2013|[more blah]                | 
+----------+-------+---------------------------+-------------------------------+---------------------------------------------------------------------------+ 
Verwandte Themen