2016-09-01 2 views
2

Stellen Sie sich ein Spark Dataframe bestehend aus Wert Beobachtungen von Variablen. Jede Beobachtung hat einen bestimmten Zeitstempel und diese Zeitstempel sind nicht zwischen den verschiedenen Variablen identisch. Dies liegt daran, dass der Zeitstempel generiert wird, wenn der Wert einer Variablen geändert und aufgezeichnet wird.PySpark: Wie resample Frequenzen

#Variable  Time    Value 
#852-YF-007 2016-05-10 00:00:00 0 
#852-YF-007 2016-05-09 23:59:00 0 
#852-YF-007 2016-05-09 23:58:00 0 

Problem Ich möchte alle Variablen in die gleiche Frequenz setzen (zum Beispiel 10 Minuten) unter Verwendung von Vorwärts-fill. Um dies zu visualisieren, habe ich eine Seite aus dem Buch "Python for Data Analysis" kopiert. Frage: Wie man das auf einem Spark-Datenframe in einem effizienten Weise tun?

Python for Data Analysis

Antwort

6

Frage: Wie das auf einem Spark-Datenrahmen auf eine effiziente Art und Weise zu tun?

Funken DataFrame ist einfach keine gute Wahl für eine Operation wie dieser. Im Allgemeinen sind SQL-Primitive nicht aussagekräftig genug und PySpark DataFrame bietet keinen Low-Level-Zugriff, der für die Implementierung erforderlich ist.

Während das erneute Sampling einfach mit Epoch/Timestamp-Arithmetik dargestellt werden kann. Mit Daten wie folgt aus:

from pyspark.sql.functions import col, max as max_, min as min_ 

df = (spark 
    .createDataFrame([ 
     ("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)], 
     ["ts", "val"])   
    .withColumn("ts", col("ts").cast("date").cast("timestamp"))) 

wir können wieder Abtastwerteingang:

day = 60 * 60 * 24 
epoch = (col("ts").cast("bigint")/day).cast("bigint") * day 

with_epoch = df.withColumn("epoch", epoch) 

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first() 

und kommen mit Bezug:

# Reference range 
ref = spark.range(
    min_epoch, max_epoch + 1, day 
).toDF("epoch") 

(ref 
    .join(with_epoch, "epoch", "left") 
    .orderBy("epoch") 
    .withColumn("ts_resampled", col("epoch").cast("timestamp")) 
    .show(15, False)) 

## +----------+---------------------+------+---------------------+ 
## |epoch  |ts     |val |ts_resampled   | 
## +----------+---------------------+------+---------------------+ 
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0| 
## |1339545600|null     |null |2012-06-13 02:00:00.0| 
## |1339632000|null     |null |2012-06-14 02:00:00.0| 
## |1339718400|null     |null |2012-06-15 02:00:00.0| 
## |1339804800|null     |null |2012-06-16 02:00:00.0| 
## |1339891200|null     |null |2012-06-17 02:00:00.0| 
## |1339977600|null     |null |2012-06-18 02:00:00.0| 
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0| 
## |1340150400|null     |null |2012-06-20 02:00:00.0| 
## |1340236800|null     |null |2012-06-21 02:00:00.0| 
## |1340323200|null     |null |2012-06-22 02:00:00.0| 
## |1340409600|null     |null |2012-06-23 02:00:00.0| 
## |1340496000|null     |null |2012-06-24 02:00:00.0| 
## |1340582400|null     |null |2012-06-25 02:00:00.0| 
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0| 
## +----------+---------------------+------+---------------------+ 

Mit niedrigen Level-APIs ist es möglich, Daten wie diese zu füllen wie ich in meiner Antwort auf Spark/Scala: forward fill with last observation gezeigt habe. Durch den Einsatz von RDDs können wir auch vermeiden, Daten zweimal zu mischen (einmal für die Verknüpfung, einmal für die Neuordnung).

Aber es gibt viel wichtigeres Problem hier. Spark funktioniert optimal, wenn das Problem auf elementweise oder partitionsweise Berechnungen reduziert werden kann. Während Forward Fill der Fall ist, wenn es möglich ist, ist dies, soweit mir bekannt ist, bei häufig verwendeten Zeitreihenmodellen typischerweise nicht der Fall, und wenn ein Vorgang einen sequentiellen Zugriff erfordert, wird Spark überhaupt keine Vorteile bieten.

Wenn Sie also mit Serien arbeiten, die groß genug sind, um verteilte Datenstrukturen zu benötigen, möchten Sie sie wahrscheinlich mit einem Objekt zusammenfassen, das leicht von einem einzigen Computer gehandhabt werden kann und dann Ihr bevorzugtes nicht verteiltes Werkzeug verwendet der Rest.

Wenn Sie mit mehreren Zeitreihen arbeiten, in denen jeder im Speicher behandelt werden kann, gibt es natürlich sparkts, aber ich weiß, dass Sie sich dessen bereits bewusst sind.

+0

Hallo Null. Wieder eine sehr schöne detaillierte Antwort. Ich habe nur eine Frage: Wie verbindet man sich mit dem Epochendatenfeld und dem Original, wenn die Zeitstempel nicht übereinstimmen? Imagine var1 hat andere Zeitstempel als var2, aber beide sollten mit den Epochen übereinstimmen. – Matthias

+0

Ich kenne Ihren Beitrag für scala-forward-fill. Irgendeine Chance, das in PySpark zu tun? – Matthias

1

antwortete ich einmal eine ähnliche Frage, it'a bisschen wie ein Hack, aber die Idee macht in Ihrem Fall sinnvoll. Ordnen Sie jeden Wert einer Liste zu und reduzieren Sie die Liste dann vertikal.

Inserting records in a spark dataframe

+0

Aber das ist nicht Resampling die Frequenzen der Zeitstempel. – Matthias

+0

Wenn Sie Ihre erste Zeile Row ([2012-06-13, 0.694283]) zu Row zuordnen ([2012-06-13, 2012-06-14, 2012-06-15] [0.694283, 0.694283, 0.694283]) dann glätten Sie es, erhalten Sie die ersten drei Reihen der zweiten Tabelle Ihres Buches. Ich weiß nicht was "resampling die Frequenzen der Zeitstempel" bedeutet. – marmouset

+0

Es bedeutet, dass Sie Variablen mit unterschiedlichen Zeitstempeln haben. Eine Variable zum Beispiel mit einem Wert von 5 um 8:10 Uhr und einem Wert von 7 um 8:14 Uhr. Eine zweite Variable mit Wert 4 um 8:09 Uhr und so weiter. Jetzt wollen Sie einen Datenrahmen mit 1min Intervall (Frequenz) ab 8:10 Uhr. Sie brauchen Zeitstempel wie 8:10, 8:11, 8:12, 8:13, 8:14 und die Werte für var1 und var2 zu diesen Zeiten. Vor var1 wäre das 5, 5, 5, 5, 7 und var2 ist 4, 4, 4, 4, 4. – Matthias