2015-09-07 9 views
8

Ich habe einen Datenrahmen mit 2 Spalten: Zeitstempel, Wert Zeitstempel ist eine Zeit seit der Epoche und Wert ist ein Float-Wert. Ich möchte Zeilen um Durchschnittswerte von min zusammenführen. Das bedeutet, dass ich alle Zeilen nehmen möchte, deren Zeitstempel von der gleichen Rundenminute ist (60 Sekunden-Intervalle seit der Epoche) und sie zu einer einzelnen Zeile zusammenfasse, wobei die Wertspalte der Mittelwert aller Werte ist.Mehrere Zeilen in einem Spark-Dataform in eine einzelne Zeile zusammenführen

ein Beispiel zu geben, läßt vermuten, dass mein Datenrahmen wie folgt aussieht:

timestamp  value 
---------  ----- 
1441637160  10.0 
1441637170  20.0 
1441637180  30.0 
1441637210  40.0 
1441637220  10.0 
1441637230  0.0 

Die ersten 4 Zeilen sind Teil derselben min (1441637160% 60 == 0, 1441637160 + 60 == 1441637220) Die letzten 2 Zeilen sind Teil eines anderen min. Ich möchte alle Zeilen der gleichen min zusammenführen. um ein Ergebnis zu erhalten, die wie folgt aussieht:

timestamp  value 
---------  ----- 
1441637160  25.0 (since (10+20+30+40)/4 = 25) 
1441637220  5.0 (since (10+0)/2 = 5) 

Was ist der beste Weg, das zu tun?

Antwort

5

Sie können einfach gruppieren und aggregieren. Mit Daten als:

val df = sc.parallelize(Seq(
    (1441637160, 10.0), 
    (1441637170, 20.0), 
    (1441637180, 30.0), 
    (1441637210, 40.0), 
    (1441637220, 10.0), 
    (1441637230, 0.0))).toDF("timestamp", "value") 

import erforderlichen Funktionen und Klassen:

import org.apache.spark.sql.functions.{lit, floor} 
import org.apache.spark.sql.types.IntegerType 

Intervall Spalte erstellen:

val tsGroup = (floor($"timestamp"/lit(60)) * lit(60)) 
    .cast(IntegerType) 
    .alias("timestamp") 

und es verwenden, Aggregation auszuführen:

df.groupBy(tsGroup).agg(mean($"value").alias("value")).show 

// +----------+-----+ 
// | timestamp|value| 
// +----------+-----+ 
// |1441637160| 25.0| 
// |1441637220| 5.0| 
// +----------+-----+ 
1

Zuerst den Zeitstempel auf den Minutenbereich abbilden und dann mit groupByKey die Durchschnittswerte berechnen. Zum Beispiel:

rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);}) 
.groupByKey 
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size))) 
.collect() 
Verwandte Themen