2017-06-24 3 views
0

Ich habe eine CSV mit dem folgenden Schema und die Daten Beispieldatei:Funken lesen csv mit Unix-Zeitstempel

userId,movieId,tag,timestamp 
28,63062,angelina jolie,1263047558 

ich den folgenden Code haben, diese Datei zu lesen.

import org.apache.spark.sql.types._ 

val inputPath = "FileStore/tables/o8pa07nd1495067426592/tags.csv" 

val customSchema = StructType(Array(
    StructField("userId", StringType, true), 
    StructField("movieId", StringType, true), 
    StructField("tag", StringType, true), 
    StructField("timestamp", TimestampType, true))) 

val df = spark.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") // Use first line of all files as header 
    .schema(customSchema) 
    .load(inputPath) 

ich den folgenden Code haben Daten zur Auswahl dataframe

df.select($"timestamp", $"tag").show(10) 

Allerdings ist es andernfalls mit java.lang.IllegalArgumentException

In CustomSchema, wenn ich die Art von TimestampType ändern Sie es in LongType funktioniert gut. Aber ich möchte die Spalte als Timestamp lesen. Wie kann ich das machen?

Dies ist das komplette Fehlerprotokoll

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException 
    at java.sql.Date.valueOf(Date.java:143) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:283) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:283) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:283) 
    at scala.util.Try.getOrElse(Try.scala:77) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:280) 
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125) 
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:173) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:172) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:235) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:217) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1430) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1429) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1429) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1612) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1937) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1950) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1963) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset$$anonfun$7.apply(Dataset.scala:252) 
    at org.apache.spark.sql.Dataset$$anonfun$7.apply(Dataset.scala:248) 
    at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791) 
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87) 
    at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) 
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:643) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:602) 
Caused by: java.lang.IllegalArgumentException 
    at java.sql.Date.valueOf(Date.java:143) 
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:283) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:283) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:283) 
    at scala.util.Try.getOrElse(Try.scala:77) 
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:280) 
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125) 
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:173) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:172) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:235) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:217) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+1

können Sie mehr Fehlerprotokoll hinzufügen, die Sie erhalten? –

+0

Keine Antwort auf Ihre Frage, aber ich nehme an, Sie würden es später in ein menschenlesbares Datum umwandeln. Warum nicht so lange lesen und dann später auf Date umstellen? – Jagannath

+0

können Sie bitte Zeilen tatsächlichen Fehlers anhängen? –

Antwort

0

java.sql.Date.valueOf wird ein IllegalArgumentException werfen, wenn das Datum nicht in dem JDBC-Datum Escape-Format (yyyy-mm-dd) gegeben ist. In Ihrem Eingabebeispiel sieht es so aus, als ob Ihre Daten im Unix-Epochenformat vorliegen.

Eine Möglichkeit, dies zu lösen, wäre, das Feld als Long zu lesen und dann die Konvertierung zu einem Timestamp durchzuführen. Das from_unixtime Paket org.apache.spark.sql.function ist eine Alternative, oder Sie könnten ein UDF erstellen. Ich würde empfehlen, die from_unixtime Methode zu verwenden:

df.withColumn("time", from_unixtime($"timestamp"))