2016-11-28 6 views
2

Ich möchte eine CSV-Datei mit Spark lesen und die Spalten mit passenden Typen zuordnen.(Scala) Konvertieren Zeichenfolge in Apache Spark

val conf = new SparkConf() 
     .setMaster("local[8]") 
     .setAppName("Name") 

    val sc = new SparkContext(conf) 

    val sqlContext = new SQLContext(sc) 

    val customSchema = StructType(Array(
     StructField("date", DateType, true), 
     StructField("time",StringType, true), 
     StructField("am", DoubleType, true), 
     StructField("hum", DoubleType, true), 
     StructField("temp", DoubleType, true) 
    )) 

    val df = sqlContext.read 
      .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") 
      .option("header","true") 
      .option("delimiter",";") 
      .schema(customSchema) 
      .load("data.csv") 

Eine Zeile der CSV-I sieht lese wie diese

+----------+--------+-----+-----+-----+ 
|  date| time| am| hum| temp| 
+----------+--------+-----+-----+-----+ 
|04.10.2016|12:51:20|1.121|0.149|0.462| 
+----------+--------+-----+-----+-----+ 

Funken die CSV-lesen und die Typen richtig zuordnen, wenn ich den Typen für das Datum String gesetzt. Wenn ich die customSchema wie in dem Code halte oben gezeigt, Spark eine Ausnahme aufgrund des falschen Datumsformates (DateType will expect YYYY-MM-DD while mine is DD.MM.YYYY).

Gibt es eine Möglichkeit umformatieren das Datum Strings zu YYYY-MM-DD werfen und das Schema gelten danach ? Oder kann ich den von Spark angegebenen DateType auch durch Hinzufügen von Parametern ändern?

Vielen Dank im Voraus

Antwort

4

Verwenden dateFormat Option:

val df = sqlContext.read 
    .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") 
    .option("header","true") 
    .option("delimiter",";") 
    .option("dateFormat", "dd.MM.yyyy") 
    .schema(customSchema) 
    .load("data.csv") 
+0

Danke, das hat funktioniert. Können Sie mir sagen, ob es eine Liste mit verfügbaren Optionen für den DataFrameReader gibt? Ich konnte noch keinen finden. –

+0

https://github.com/databricks/spark-csv#features –

0

ich die Termine Parsen danach empfehlen. Siehe auch this.

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts") 
import org.apache.spark.sql.functions.unix_timestamp 

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") 

df.withColumn("ts", ts).show(2, false) 
// +---+-------------------+---------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+---------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| 
// |2 |#[email protected]#@#    |null     | 
// +---+-------------------+---------------------+ 

und:

scala> date.format(DateTimeFormatter.ofPattern("yyyy.MM.dd")) 
res0: String = 2020.01.01 

Auch als eine Randnotiz, da Funken 2.0, verwenden Sie Funkensitzungsobjekt nur und Verwendung Geber für Schema Folgern (statt sc, SqlContext usw.). Etwas wie dieses:

spark = SparkSession(...) 
case class User(id:Int, city:String, loc:Array[Double], pop:Long, state:String) 
val users = (spark.read.option("inferSchema","true").option("header","true").csv("data/users1.csv").as[User]) 
Verwandte Themen