2016-08-10 3 views
1

Ich habe einen Datensatz, der ungefähr 200 KB groß ist. Ich habe die Daten bereinigt und lud sie in ein RDD Spark (pyspark verwendet wird), so dass der Header-Format ist die folgende:Aggregierte Zeit zwischen abwechselnden Zeilen

Employee ID | Timestamp (MM/DD/YYYY HH:MM) | Location 

Dieser Datensatz speichert Mitarbeiter Stempel-in und Stempel-out-Zeiten, und ich müssen die Zeit aufaddieren, die sie bei der Arbeit verbracht haben. Angenommen, das Format der Zeilen ist sauber und streng alternierend (also einprägen, ausstempeln, einwerfen, ausheften usw.), gibt es eine Möglichkeit, die in Spark verbrachte Zeit zu aggregieren?

Ich habe versucht, Filter auf alle "Stempel in" -Werte zu verwenden und die Zeit mit dem Wert in der Zeile direkt nach (so r + 1) zu aggregieren, aber dies erweist sich als sehr schwierig, um nicht teuer zu sein. Ich denke, das wäre in einer Sprache wie Java oder Python einfach zu machen, aber bevor ich umschalte, fehlt mir eine Lösung, die in Spark implementiert werden kann?

+0

dort Unter der Annahme, höchstens 2 Zeitstempel für einen bestimmten Tag und Mitarbeiter, könnten Sie eine zusätzliche Spalte für das Datum des Zeitstempels erstellen. Dann machen Sie eine Gruppe von Mitarbeiter und Datum. Jetzt sind die Stempel und die Stempel in der gleichen Reihe und die Berechnung der Differenz innerhalb eines Funken RDD ist trivial. – mrwyatt

+0

@mrwyatt Es gibt nicht nur zwei Mal pro Tag, aber ich habe immer noch eine Gruppierung über den Namen und das Datum des Mitarbeiters gemacht, um die Daten einfacher zu durchlaufen. Vielen Dank. – psak

Antwort

0

Sie können versuchen, mit Hilfe der Fensterfunktion lead:

from pyspark.sql import Window 
from pyspark.sql.functions import * 

window = Window.partitionBy("id").orderBy("timestamp") 
newDf = df.withColumn("stampOut", lead("timestamp", 1).over(window)).where(col("stampOut").isNotNull()) 

finalDf = newDf.select(col("id"), col("stampOut") - col("timestamp"))