Das Argument count
der lag
Funktion nimmt eine ganze Zahl nicht eine Spaltenobjekt:
psf.lag(col, count=1, default=None)
Daher kann es nicht ein „dynamischer“ Wert sein. Stattdessen können Sie Ihre Verzögerung in einer Spalte erstellen und dann die Tabelle mit sich selbst verbinden.
Lassen Sie uns zunächst unsere Datenrahmen erstellen:
df = spark.createDataFrame(
sc.parallelize(
[[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]]
),
["int", "date"]
)
Wir wollen die Zeilen aufzuzählen:
from pyspark.sql import Window
import pyspark.sql.functions as psf
df = df.withColumn(
"id",
psf.monotonically_increasing_id()
)
w = Window.orderBy("id")
df = df.withColumn("rn", psf.row_number().over(w))
+---+----------+-----------+---+
|int| date| id| rn|
+---+----------+-----------+---+
| 1|2011-01-01|17179869184| 1|
| 1|2012-01-01|42949672960| 2|
| 2|2013-01-01|68719476736| 3|
| 1|2014-01-01|94489280512| 4|
+---+----------+-----------+---+
nun die Verzögerung zu bauen:
df1 = df.select(
"int",
df.date.alias("date1"),
(df.rn - df.int).alias("rn")
)
df2 = df.select(
df.date.alias("date2"),
'rn'
)
Endlich können wir ihnen anschließen und Berechnen Sie den Datumsunterschied:
df1.join(df2, "rn", "inner").withColumn(
"date_diff",
psf.datediff("date1", "date2")
).drop("rn")
+---+----------+----------+---------+
|int| date1| date2|date_diff|
+---+----------+----------+---------+
| 1|2012-01-01|2011-01-01| 365|
| 2|2013-01-01|2011-01-01| 731|
| 1|2014-01-01|2013-01-01| 365|
+---+----------+----------+---------+
Überprüfen Sie die Antwort hier: https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark sonst setzen Sie mehr Details über die Probleme und den Datensatz – MedAli