2016-07-28 4 views
1

Ich mache gerne die folgende Umwandlung. Wenn ein Datenrahmen angegeben wird, der angibt, ob ein Benutzer jede Stunde aktiv ist, und eine ununterbrochene Anzahl von Stunden, die der Benutzer aktiv ist, als Sitzung gezählt wird, versuche ich, die kumulative Stunde in jeder Sitzung zu sammeln.Spark (Scala): Berechnen Sitzungslänge des Benutzers in DataFrame mit kontinuierlichen aktiven Stunden

Zum Beispiel würde der ursprüngliche Datenrahmen wie folgt aussehen:

scala> val df = sc.parallelize(List(
    ("user1",0,true), 
    ("user1",1,true), 
    ("user1",2,false), 
    ("user1",3,true), 
    ("user1",4,false), 
    ("user1",5,false), 
    ("user1",6,true), 
    ("user1",7,true), 
    ("user1",8,true) 
)).toDF("user_id","hour_of_day","is_active") 
df: org.apache.spark.sql.DataFrame = [user_id: string, hour_of_day: int, is_active: boolean] 

    +-------+-----------+---------+ 
    |user_id|hour_of_day|is_active| 
    +-------+-----------+---------+ 
    |user1 |0   |true  | 
    |user1 |1   |true  | 
    |user1 |2   |false | 
    |user1 |3   |true  | 
    |user1 |4   |false | 
    |user1 |5   |false | 
    |user1 |6   |true  | 
    |user1 |7   |true  | 
    |user1 |8   |true  | 
    +-------+-----------+---------+ 

Und ich möchte zwei Spalten hinzufügen, die verfolgt, wenn eine Sitzung beginnen und die Länge der Sitzung. Eine dieser Spalten zu bekommen würde mir erlauben, die andere zu lösen, also würde beides funktionieren.

Beispiel unten:

+-------+-----------+---------+------------------+--------------+ 
    |user_id|hour_of_day|is_active|session_begin_hour|session_length| 
    +-------+-----------+---------+------------------+--------------+ 
    |user1 |0   |true  |0     |1    | 
    |user1 |1   |true  |0     |2    | 
    |user1 |2   |false |null    |0    | 
    |user1 |3   |true  |3     |1    | 
    |user1 |4   |false |null    |0    | 
    |user1 |5   |false |null    |0    | 
    |user1 |6   |true  |6     |1    | 
    |user1 |7   |true  |6     |2    | 
    |user1 |8   |true  |6     |3    | 
    +-------+-----------+---------+------------------+--------------+ 

Ich versuchte WindowSpec zu verwenden, um eine Reihe zurück zu blicken, aber das würde mir nicht erlauben, den Wert für die Spalte zu berechnen, basierend auf der letzten Zeile, wenn diese Spalte nicht vorhanden ist im ursprünglichen DF.

Gibt es eine elegante Lösung, um dieses Problem zu lösen, vorzugsweise in Scala.

Vielen Dank im Voraus!

Antwort

2

Lassen Sie uns zunächst feststellen, ob gegebenen Datensatz den Beginn der Sitzung markiert:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val userWindow = Window.partitionBy($"user_id").orderBy($"hour_of_day") 
val prevActive = lag($"is_active", 1).over(userWindow) 
val newSession = $"is_active" && (prevActive.isNull || not(prevActive)) 

val withInd = df.withColumn("new_session", newSession) 

// +-------+-----------+---------+-----------+ 
// |user_id|hour_of_day|is_active|new_session| 
// +-------+-----------+---------+-----------+ 
// | user1|   0|  true|  true| 
// | user1|   1|  true|  false| 
// | user1|   2| false|  false| 
// | user1|   3|  true|  true| 
// | user1|   4| false|  false| 
// | user1|   5| false|  false| 
// | user1|   6|  true|  true| 
// | user1|   7|  true|  false| 
// | user1|   8|  true|  false| 
// +-------+-----------+---------+-----------+ 

Als nächstes wollen wir Session-ID generieren:

val session = when(
    $"is_active", 
    sum($"new_session".cast("long")).over(userWindow) 
) 

val withSession = withInd.withColumn("session", session) 

// +-------+-----------+---------+-----------+-------+ 
// |user_id|hour_of_day|is_active|new_session|session| 
// +-------+-----------+---------+-----------+-------+ 
// | user1|   0|  true|  true|  1| 
// | user1|   1|  true|  false|  1| 
// | user1|   2| false|  false| null| 
// | user1|   3|  true|  true|  2| 
// | user1|   4| false|  false| null| 
// | user1|   5| false|  false| null| 
// | user1|   6|  true|  true|  3| 
// | user1|   7|  true|  false|  3| 
// | user1|   8|  true|  false|  3| 
// +-------+-----------+---------+-----------+-------+ 

Schließlich wollen wir ein neues Fenster und Rechenwerte von Interesse erstellen:

val userSessionWindow = userWindow.partitionBy($"user_id", $"session") 

val sessionBeginHour = when(
    $"is_active", 
    min($"hour_of_day").over(userSessionWindow) 
) 

val sessionLength = when(
    $"is_active", 
    $"hour_of_day" + 1 - sessionBeginHour 
).otherwise(0) 

val result = withSession 
    .withColumn("session_begin_hour", sessionBeginHour) 
    .withColumn("session_length", sessionLength) 
    .drop("new_session") 
    .drop("session") 

result.orderBy($"hour_of_day").show 
// +-------+-----------+---------+------------------+--------------+ 
// |user_id|hour_of_day|is_active|session_begin_hour|session_length| 
// +-------+-----------+---------+------------------+--------------+ 
// | user1|   0|  true|     0|    1| 
// | user1|   1|  true|     0|    2| 
// | user1|   2| false|    null|    0| 
// | user1|   3|  true|     3|    1| 
// | user1|   4| false|    null|    0| 
// | user1|   5| false|    null|    0| 
// | user1|   6|  true|     6|    1| 
// | user1|   7|  true|     6|    2| 
// | user1|   8|  true|     6|    3| 
// +-------+-----------+---------+------------------+--------------+ 
Verwandte Themen