2017-11-12 5 views
2

Mit Spark 2.x benutze ich die Datenframes.Karte in einem Spark-Dataframe

val proposals = spark.read 
    .option("header", true) 
    .option("inferSchema", true) 
    .option("delimiter", ";") 
    .csv("/proposals.txt.gz") 

proposals.printSchema() 

, die funktioniert gut und gibt:

root 
|-- MARKETCODE: string (nullable = true) 
|-- REFDATE: string (nullable = true) 
|-- UPDTIME: string (nullable = true) 
|-- UPDTIMEMSEC: integer (nullable = true) 
|-- ENDTIME: string (nullable = true) 
|-- ENDTIMEMSEC: integer (nullable = true) 
|-- BONDCODE: string (nullable = true) 

Jetzt würde Ich mag eine Zeit in Millisekunden berechnen und somit eine Funktion geschrieben haben:

def time2usecs(time:String, msec:Int)={ 
    val Array(hour,minute,seconds) = time.split(":").map(_.toInt) 
    msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000 
} 
time2usecs("08:13:44", 111) 


time2usecs: (time: String, msec: Int)Int 
res90: Int = 29624111 

Der letzte Frieden von der puzzle das wäre so etwas wie:

proposals.withColumn("utime", 
    proposals.select("UPDTIME","UPDTIMEMSEC") 
    .map((t,tms) => time2usecs(t,tms))) 

Aber ich kann nicht herausfinden, wie man den df.select(column1, column2).map(...) Teil macht.

Antwort

1

Warum verwenden Sie SQL nicht den ganzen Weg?

import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions._ 

def time2usecs(time: Column, msec: Column) = { 
    val bits = split(time, ":") 
    msec + bits(2).cast("int") * 1000 + bits(1).cast("int") * 60 * 1000 + 
    bits(0).cast("int") *60*60*1000 
} 

df.withCoulmn("ts", time2usecs(col(""UPDTIME"), col("UPDTIMEMSEC")) 

mit Ihrem Code müssen Sie:

proposals 
    .select("UPDTIME","UPDTIMEMSEC") 
    .as[(String, Int)] 
    .map { case (t, s) => time2usecs(t, s) } 
2

Der gemeinsame Ansatz eine Methode für Datenrahmen Spalten in Spark verwendet, ist eine UDF (benutzerdefinierte Funktion zu definieren, siehe here für mehr Information). Für Ihren Fall:

import org.apache.spark.sql.functions.udf 
import spark.implicits._ 

val time2usecs = udf((time: String, msec: Int) => { 
    val Array(hour,minute,seconds) = time.split(":").map(_.toInt) 
    msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000 
}) 

val df2 = df.withColumn("utime", time2usecs($"UPDTIME", $"UPDTIMEMSEC")) 

spark.implicits._ wird hier importiert die Verwendung der $ Abkürzung für die col() Funktion zu ermöglichen.

Verwandte Themen