4

Ich habe einige Daten, die ich nach einer bestimmten Spalte gruppieren möchte, dann eine Reihe von Feldern basierend auf einem rollierenden Zeitfenster aus der Gruppe aggregieren. HierWie Aggregieren über Rolling-Zeit-Fenster mit Gruppen in Spark

sind einige Beispieldaten:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1), 
          Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2), 
          Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3), 
          Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3), 
          Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3), 
          Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)]) 

ich von group_by gruppieren mag, dann Zeitfenster erstellen, die zum frühestmöglichen Zeitpunkt beginnen und verlängern, bis es für die Gruppe 30 Tage ohne Eintrag sind. Nach diesen 30 Tagen beginnt das nächste Zeitfenster mit dem Datum der nächsten Zeile, die nicht in das vorherige Fenster fällt.

Ich möchte dann aggregieren, zum Beispiel den Durchschnitt von get_avg, und das erste Ergebnis von get_first bekommen.

So sollte die Ausgabe für dieses Beispiel sein:

group_by first date of window get_avg get_first 
group1  2016-01-01    5  1 
group2  2016-02-01    20  3 
group2  2016-04-02    8  4 

edit: sorry ich meine Frage klar nicht richtig angegeben wurde. Ich möchte eigentlich ein Fenster, das nach 30 Tagen Inaktivität endet. Ich habe den group2-Teil des Beispiels entsprechend geändert.

Antwort

9

Überarbeitete Antwort:

können Sie eine einfache Funktionen hier Trick-Fenster verwenden. Ein Bündel von Importen:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_ 
from pyspark.sql.window import Window 

Fensterdefinition:

w = Window.partitionBy("group_by").orderBy("date") 

Guss date-DateType:

df_ = df.withColumn("date", col("date").cast("date")) 

definieren folgende Ausdrücke:

# Difference from the previous record or 0 if this is the first one 
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0)) 

# 0 if diff <= 30, 1 otherwise 
indicator = (diff > 30).cast("integer") 

# Cumulative sum of indicators over the window 
subgroup = sum_(indicator).over(w).alias("subgroup") 

hinzufügenAusdruck auf den Tisch:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg") 
+--------+--------+------------+ 
|group_by|subgroup|avg(get_avg)| 
+--------+--------+------------+ 
| group1|  0|   5.0| 
| group2|  0|  20.0| 
| group2|  1|   8.0| 
+--------+--------+------------+ 

first ist mit Aggregationen nicht sinnvoll, aber wenn Spalte monoton wächst können Sie min verwenden. Andernfalls müssen Sie auch Fensterfunktionen verwenden.

Getestet mit Spark 2.1. Kann Unterabfragen und Window Instanz erfordern, wenn sie mit früheren Spark-Versionen verwendet werden.

Die ursprüngliche Antwort (nicht relevant im angegebenen Bereich)

Da Spark-2.0 Sie sollten a window function verwenden können:

Bucketize Zeilen in eine oder mehrere Zeit Fenster gegeben eine Zeitstempel spezifizierende Spalte. Fensterstarts sind inklusive, aber die Fensterenden sind exklusiv, z. 12:05 wird im Fenster sein [12: 05,12: 10], aber nicht in [12: 00,12: 05].

from pyspark.sql.functions import window 

df.groupBy(window("date", windowDuration="30 days")).count() 

aber Sie aus dem Ergebnis sehen können,

+---------------------------------------------+-----+ 
|window          |count| 
+---------------------------------------------+-----+ 
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 | 
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 | 
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 | 
+---------------------------------------------+-----+ 

Sie ein bisschen vorsichtig sein müssen, wenn es um Zeitzonen kommt.

Verwandte Themen