2017-03-21 2 views
1

so schreibe ich eine benutzerdefinierte Datenquelle zum Lesen von Daten aus HBase. Alles in Ordnung, bis ExistingRDD.rowToRowRdd Anruf bekommen. Dann versucht es das Schema von meinem GenericRowWithSchema zu bekommen. Es scheitert Ich habe keine Ideia warum ... Ich sah Menschen, die in der Vergangenheit ähnliche Probleme hatten. Ich bin auf Spark-1.6.3 und mein Schema festgelegt:Spark Benutzerdefinierte DataSource

StructType(Seq(StructField("Date", LongType), 
     StructField("Device", StringType), 
     StructField("Tag", StringType), 
     StructField("TagValue", DoubleType)) 
    ) 

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): scala.MatchError: 1451610000 (of class java.lang.Long) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:59) 
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:56) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88) 
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 

Any ideias?

Antwort

1

Also ich finde warum. On override def buildScan (requiredColumns: Array [String], Filter: Array [Filter]): RDD [Row] = {....} Ihre Zeile muss exakt die erforderlichen Spalten haben