Frage: Wie das auf einem Spark-Datenrahmen auf eine effiziente Art und Weise zu tun?
Funken DataFrame
ist einfach keine gute Wahl für eine Operation wie dieser. Im Allgemeinen sind SQL-Primitive nicht aussagekräftig genug und PySpark DataFrame
bietet keinen Low-Level-Zugriff, der für die Implementierung erforderlich ist.
Während das erneute Sampling einfach mit Epoch/Timestamp-Arithmetik dargestellt werden kann. Mit Daten wie folgt aus:
from pyspark.sql.functions import col, max as max_, min as min_
df = (spark
.createDataFrame([
("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],
["ts", "val"])
.withColumn("ts", col("ts").cast("date").cast("timestamp")))
wir können wieder Abtastwerteingang:
day = 60 * 60 * 24
epoch = (col("ts").cast("bigint")/day).cast("bigint") * day
with_epoch = df.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
und kommen mit Bezug:
# Reference range
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("ts_resampled", col("epoch").cast("timestamp"))
.show(15, False))
## +----------+---------------------+------+---------------------+
## |epoch |ts |val |ts_resampled |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null |null |2012-06-13 02:00:00.0|
## |1339632000|null |null |2012-06-14 02:00:00.0|
## |1339718400|null |null |2012-06-15 02:00:00.0|
## |1339804800|null |null |2012-06-16 02:00:00.0|
## |1339891200|null |null |2012-06-17 02:00:00.0|
## |1339977600|null |null |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null |null |2012-06-20 02:00:00.0|
## |1340236800|null |null |2012-06-21 02:00:00.0|
## |1340323200|null |null |2012-06-22 02:00:00.0|
## |1340409600|null |null |2012-06-23 02:00:00.0|
## |1340496000|null |null |2012-06-24 02:00:00.0|
## |1340582400|null |null |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+
Mit niedrigen Level-APIs ist es möglich, Daten wie diese zu füllen wie ich in meiner Antwort auf Spark/Scala: forward fill with last observation gezeigt habe. Durch den Einsatz von RDDs können wir auch vermeiden, Daten zweimal zu mischen (einmal für die Verknüpfung, einmal für die Neuordnung).
Aber es gibt viel wichtigeres Problem hier. Spark funktioniert optimal, wenn das Problem auf elementweise oder partitionsweise Berechnungen reduziert werden kann. Während Forward Fill der Fall ist, wenn es möglich ist, ist dies, soweit mir bekannt ist, bei häufig verwendeten Zeitreihenmodellen typischerweise nicht der Fall, und wenn ein Vorgang einen sequentiellen Zugriff erfordert, wird Spark überhaupt keine Vorteile bieten.
Wenn Sie also mit Serien arbeiten, die groß genug sind, um verteilte Datenstrukturen zu benötigen, möchten Sie sie wahrscheinlich mit einem Objekt zusammenfassen, das leicht von einem einzigen Computer gehandhabt werden kann und dann Ihr bevorzugtes nicht verteiltes Werkzeug verwendet der Rest.
Wenn Sie mit mehreren Zeitreihen arbeiten, in denen jeder im Speicher behandelt werden kann, gibt es natürlich sparkts
, aber ich weiß, dass Sie sich dessen bereits bewusst sind.
Hallo Null. Wieder eine sehr schöne detaillierte Antwort. Ich habe nur eine Frage: Wie verbindet man sich mit dem Epochendatenfeld und dem Original, wenn die Zeitstempel nicht übereinstimmen? Imagine var1 hat andere Zeitstempel als var2, aber beide sollten mit den Epochen übereinstimmen. – Matthias
Ich kenne Ihren Beitrag für scala-forward-fill. Irgendeine Chance, das in PySpark zu tun? – Matthias