2017-08-21 7 views
5

Ich habe ein Dataset bestehend aus einer Timestamp-Spalte und einer Dollar-Spalte. Ich möchte die durchschnittliche Anzahl von Dollar pro Woche finden, die am Zeitstempel jeder Zeile endet. Ich schaute zuerst auf die Funktion pyspark.sql.functions.window, aber das Bins die Daten nach Woche.pyspark: rollender Durchschnitt mit Zeitreihendaten

Hier ist ein Beispiel:

%pyspark 
import datetime 
from pyspark.sql import functions as F 

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) 
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp')) 

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) 
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

Daraus ergeben sich zwei Datensätze:

|  start  |   end   | avg | 
|---------------------|----------------------|-----| 
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| 
|---------------------|----------------------|-----| 
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| 
|---------------------|----------------------|-----| 

Die Fensterfunktion binned die Zeitreihendaten, anstatt einen gleitenden Durchschnitt durchgeführt wird.

Gibt es eine Möglichkeit, einen gleitenden Durchschnitt zu führen, wo ich mit einer Zeitperiode für jede Zeile zurück endet am timestampGMT der Reihe einen wöchentlichen Durchschnitt bekommen?

EDIT:

Zhangs Antwort unten ist in der Nähe zu dem, was ich will, aber nicht genau das, was ich würde gerne sehen.

Hier ist ein besseres Beispiel zu zeigen, was ich versuche zu bekommen:

%pyspark 
from pyspark.sql import functions as F 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

Daraus ergibt sich die folgende Datenrahmen:

dollars timestampGMT   rolling_average 
25  2017-03-18 11:27:18.0 25 
17  2017-03-10 15:27:18.0 15 
13  2017-03-15 12:27:18.0 15 

ich den Durchschnitt über die sein möchten Woche, die das Datum in der timestampGMT-Spalte fortsetzt, was dazu führen würde:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17 
13  2017-03-15 12:27:18.0 15 
25  2017-03-18 11:27:18.0 19 

In der obigen Resu lts, das rolling_average für 2017-03-10 ist 17, da es keine vorhergehenden Datensätze gibt. Der rolling_average für 2017-03-15 ist 15, weil er den 13 vom 2017-03-15 und den 17 vom 2017-03-10 mittelt, der in das vorhergehende 7-Tage-Fenster fällt. Der gleitende Durchschnitt für 2017-03-18 ist 19, weil er die 25 vom 2017-03-18 und die 13 vom 2017-03-10, die in das vorhergehende 7-Tage-Fenster fällt, und die 17 von 2017 nicht berücksichtigt -03-10, weil das nicht in das vorhergehende 7-Tage-Fenster fällt.

Gibt es eine Möglichkeit, dies zu tun, anstatt das Binning-Fenster, wo die wöchentlichen Fenster nicht überlappen?

Antwort

4

ich den richtigen Weg gefunden, ein sich bewegendes/gleitender Durchschnitt mit dieser Stackoverflow zu berechnen:

Spark Window Functions - rangeBetween dates

Die Grundidee besteht darin, Ihre Timestamp-Spalte in secon zu konvertieren ds, und Sie können die rangeBetween-Funktion in der pyspark.sql.Window-Klasse verwenden, um die richtigen Zeilen in Ihrem Fenster einzuschließen.

Hier ist das gelöst Beispiel:

%pyspark 
from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

Daraus ergibt sich die genaue Spaltenmittelwerte der Fahrzeuge, die ich suchte:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17.0 
13  2017-03-15 12:27:18.0 15.0 
25  2017-03-18 11:27:18.0 19.0 
1

Wollen Sie dies:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"), 
          (13, "2017-03-11T12:27:18+00:00"), 
          (21, "2017-03-17T11:27:18+00:00")], 
          ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days")))) 

Ausgang:

+-------+-------------------+---------------+         
|dollars|timestampGMT  |rolling_average| 
+-------+-------------------+---------------+ 
|21  |2017-03-17 19:27:18|21.0   | 
|17  |2017-03-11 23:27:18|15.0   | 
|13  |2017-03-11 20:27:18|15.0   | 
+-------+-------------------+---------------+ 
+0

Dank Zhang, das ist näher an, was ich will, aber nicht genau das, was ich möchte. Ihr Code berechnet weiterhin die Antworten über das Datumsbintern. Ich möchte, dass jeder Wochendurchschnitt am Datum in der Reihe endet. Es ist meine Schuld, kein gutes Beispiel zu geben. Ich werde meinen Beitrag mit einem aktualisierten Beispiel bearbeiten, das zeigt, was ich möchte. –