Dies erreicht eine window function Verwendung werden kann.
import org.apache.spark.sql.expressions.Window
import sqlContext.implicits._
val df = sc.parallelize(Seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).toDF("animal", "value", "time")
df.show
+------+-----+-----+
|animal|value| time|
+------+-----+-----+
| cat| 8|01:00|
| cat| 5|02:00|
| cat| 6|03:00|
| dog| 2|02:00|
| dog| 4|04:00|
| dog| 3|06:00|
| rat| 7|01:00|
| rat| 4|03:00|
| rat| 9|05:00|
+------+-----+-----+
Ich habe ein "Zeit" -Feld hinzugefügt, um orderBy zu veranschaulichen.
val w1 = Window.partitionBy($"animal").orderBy($"time")
val previous_value = lag($"value", 1).over(w1)
val df1 = df.withColumn("previous", previous_value)
df1.show
+------+-----+-----+--------+
|animal|value| time|previous|
+------+-----+-----+--------+
| dog| 2|02:00| null|
| dog| 4|04:00| 2|
| dog| 3|06:00| 4|
| cat| 8|01:00| null|
| cat| 5|02:00| 8|
| cat| 6|03:00| 5|
| rat| 7|01:00| null|
| rat| 4|03:00| 7|
| rat| 9|05:00| 4|
+------+-----+-----+--------+
Wenn Sie NULL-Werte mit 0 ersetzt werden soll:
val df2 = df1.na.fill(0)
df2.show
+------+-----+-----+--------+
|animal|value| time|previous|
+------+-----+-----+--------+
| dog| 2|02:00| 0|
| dog| 4|04:00| 2|
| dog| 3|06:00| 4|
| cat| 8|01:00| 0|
| cat| 5|02:00| 8|
| cat| 6|03:00| 5|
| rat| 7|01:00| 0|
| rat| 4|03:00| 7|
| rat| 9|05:00| 4|
+------+-----+-----+--------+
Ich habe keine Zeit es jetzt zu versuchen, aber 1) verlassen Sie sich nicht auf Datenrahmen bestellen, fügen Sie eine explizite 'index' Spalte und 2) versuchen Sie "partitionieren" mit "Tier" und verwenden Sie dann "mapPartitions", um Ihre Zeilenverschiebung durchzuführen. Es wird wahrscheinlich nicht schön sein. –