3

Wie berechne ich Rolling Median von Dollar für eine Fenstergröße der vorherigen 3 Werte?Wie berechnet man Rolling Median in Pyspark mit Window()?

Eingangsdaten

dollars timestampGMT  
25  2017-03-18 11:27:18 
17  2017-03-18 11:27:19 
13  2017-03-18 11:27:20 
27  2017-03-18 11:27:21 
13  2017-03-18 11:27:22 
43  2017-03-18 11:27:23 
12  2017-03-18 11:27:24 

Erwartete Ausgangsdaten

dollars timestampGMT   rolling_median_dollar 
25  2017-03-18 11:27:18 median(25) 
17  2017-03-18 11:27:19 median(17,25) 
13  2017-03-18 11:27:20 median(13,17,25) 
27  2017-03-18 11:27:21 median(27,13,17) 
13  2017-03-18 11:27:22 median(13,27,13) 
43  2017-03-18 11:27:23 median(43,13,27) 
12  2017-03-18 11:27:24 median(12,43,13) 

Below Code nicht bewegt avg aber pyspark muß nicht F.median().

pyspark: rolling average using timeseries data

EDIT 1: Die Herausforderung ist median() Funktion nicht herausnimmt. Ich kann nicht

df = df.withColumn('rolling_average', F.median("dollars").over(w)) 

tun, wenn ich gleitenden Durchschnitt wollte ich

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

EDIT 2 getan haben könnte: Versuchte mit approxQuantile()

windfun = Window().partitionBy().orderBy(F.col(date_column)).rowsBetwe‌​en(-3, 0) sdf.withColumn("movingMedian", sdf.approxQuantile(col='a', probabilities=[0.5], relativeError=0.00001).over(windfun)) 

Aber immer Fehler

AttributeError: 'list' object has no attribute 'over' 

EDIT 3

Bitte geben Sie eine Lösung ohne UDF, da es nicht von der Katalysatoroptimierung profitieren wird.

+0

Haben Sie versucht, durch 'timestampGMT' und tun, um die Berechnung zu bestellen über die Zeilen pro Fenster? Ich bin nur neugierig, was das Problem ist (und frage mich, ob die Implementierung des Median der eine sein könnte). –

+0

bearbeitet die Frage, um das genaue Problem zu enthalten –

+0

Gesehen 'df.stat.approxQuantile' und https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles .html? –

Antwort

2

Eine Möglichkeit ist es, die $dollars Spalte als Liste pro Fenster zu sammeln, und dann den Median der Ergebnislisten berechnen unter Verwendung eines udf:

from pyspark.sql.window import Window 
from pyspark.sql.functions import * 
import numpy as np 
from pyspark.sql.types import FloatType 

w = (Window.orderBy(col("timestampGMT").cast('long')).rangeBetween(-2, 0)) 
median_udf = udf(lambda x: float(np.median(x)), FloatType()) 

df.withColumn("list", collect_list("dollars").over(w)) \ 
    .withColumn("rolling_median", median_udf("list")).show(truncate = False) 
+-------+---------------------+------------+--------------+ 
|dollars|timestampGMT   |list  |rolling_median| 
+-------+---------------------+------------+--------------+ 
|25  |2017-03-18 11:27:18.0|[25]  |25.0   | 
|17  |2017-03-18 11:27:19.0|[25, 17] |21.0   | 
|13  |2017-03-18 11:27:20.0|[25, 17, 13]|17.0   | 
|27  |2017-03-18 11:27:21.0|[17, 13, 27]|17.0   | 
|13  |2017-03-18 11:27:22.0|[13, 27, 13]|13.0   | 
|43  |2017-03-18 11:27:23.0|[27, 13, 43]|27.0   | 
|12  |2017-03-18 11:27:24.0|[13, 43, 12]|13.0   | 
+-------+---------------------+------------+--------------+ 
+1

Danke. Aber können wir es ohne UDF machen, da es nicht von der Katalysatoroptimierung profitiert? –

+0

gibt es keine native Spark-Alternative, fürchte ich. – mtoto

+0

Was ist mit der Verwendung von percentRank() mit Fensterfunktion? Ich habe irgendwo gelesen, aber Code wurde nicht gegeben. Ruft das eine Glocke? –

Verwandte Themen