2017-10-05 8 views
1

Ich versuche, eine neue Spalte von Listen in PYSPARK zu erstellen, indem ich eine groupby-Aggregation für vorhandene Spaltensätze verwende. Ein Beispiel Eingangsdatenrahmen wird unten bereitgestellt:collect_list durch Beibehalten der Reihenfolge basierend auf einer anderen Variablen

------------------------ 
id | date  | value 
------------------------ 
1 |2014-01-03 | 10 
1 |2014-01-04 | 5 
1 |2014-01-05 | 15 
1 |2014-01-06 | 20 
2 |2014-02-10 | 100 
2 |2014-03-11 | 500 
2 |2014-04-15 | 1500 

Die erwartete Ausgabe ist:

id | value_list 
------------------------ 
1 | [10, 5, 15, 20] 
2 | [100, 500, 1500] 

Die Werte innerhalb einer Liste werden nach dem Datum sortiert.

Ich versuchte collect_list wie folgt verwendet:

from pyspark.sql import functions as F 
ordered_df = input_df.orderBy(['id','date'],ascending = True) 
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value")) 

Aber collect_list garantiert nicht, um selbst wenn ich den Eingangsdatenrahmen nach dem Datum vor der Aggregation sortieren.

Könnte jemand bei der Aggregation helfen, indem er die Reihenfolge basierend auf einer zweiten (Datums-) Variable beibehält?

Antwort

6

Wenn Sie sowohl Datumsangaben als auch Werte als Liste erfassen, können Sie die Ergebnisspalte nach dem Datum unter Verwendung von und udf sortieren und dann nur die Werte im Ergebnis beibehalten.

import operator 
import pyspark.sql.functions as F 

# create list column 
grouped_df = input_df.groupby("id") \ 
       .agg(F.collect_list(F.struct("date", "value")) \ 
       .alias("list_col")) 

# define udf 
def sorter(l): 
    res = sorted(l, key=operator.itemgetter(0)) 
    return [item[1] for item in res] 

sort_udf = F.udf(sorter) 

# test 
grouped_df.select("id", sort_udf("list_col") \ 
    .alias("sorted_list")) \ 
    .show(truncate = False) 
+---+----------------+ 
|id |sorted_list  | 
+---+----------------+ 
|1 |[10, 5, 15, 20] | 
|2 |[100, 500, 1500]| 
+---+----------------+ 
+0

Danke für die ausführliche Beispiel ... Ich habe gerade versucht es auf einem größeren Daten von ein paar Millionen, und ich bin genau die gleiche Sequenz wie die von collect_list bekommen ... Gibt es eine Möglichkeit, dies könnte zu erklären, warum passiert sein? Außerdem, überprüft, dass collect_list nur diese Fälle mit mehreren Werten innerhalb eines Datums zu vermasseln scheint ... Bedeutet es, dass collect_list auch die Reihenfolge beibehält? – Ravi

+1

In Ihrem Code sortieren Sie den gesamten Datensatz vor collect_list(), also ja. Dies ist jedoch nicht notwendig. Es ist effizienter, die resultierende Liste von Tupeln nach dem Sammeln von Datum und Wert in einer Liste zu sortieren. – mtoto

+0

Nur um zu verdeutlichen ... Sortierung der Spalte und Verwendung von Collect_list in der sortierten Spalte würde die Reihenfolge beibehalten? – Ravi

Verwandte Themen