1

ich als JSON-Eingabedatei haben:scala.MatchError: [abc, cde, null, 3] (der Klasse org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) in Spark-JSON mit fehlenden Feldern

{"a": "abc", "b": "bcd", "d": 3}, 
{"a": "ezx", "b": "hdg", "c": "ssa"}, 
... 

einige Felder pro Objekt fehlen, anstatt den null Wert zu platzieren.

In Apache Spark mit Scala:

import SparkCommons.sparkSession.implicits._ 

private val inputJsonPath: String = "resources/input/input.json" 

private val schema = StructType(Array(
    StructField("a", StringType, nullable = false), 
    StructField("b", StringType, nullable = false), 
    StructField("c", StringType, nullable = true), 
    StructField("d", DoubleType, nullable = true) 
)) 

private val inputDF: DataFrame = SparkCommons.sparkSession 
    .read 
    .schema(schema) 
    .json(inputJsonPath) 
    .cache() 

inputDF.printSchema() 

val dataRdd = inputDF.rdd 
.map { 
    case Row(a: String, b: String, c: String, d: Double) => 
    MyCaseClass(a, b, c, d) 
} 

val dataMap = dataRdd.collectAsMap() 

Der MyCaseClass Code:

case class MyCaseClass(
       a: String, 
       b: String, 
       c: String = null, 
       d: Double = Predef.Double2double(null) 
) 

ich folgendes Schema als Ausgabe erhalten: kompiliert

root 
|-- a: string (nullable = true) 
|-- b: string (nullable = true) 
|-- c: string (nullable = true) 
|-- d: double (nullable = true) 

Programm aber zur Laufzeit einmal Funken macht die Jobs Ich bekomme die folgende Ausnahme:

[error] - org.apache.spark.executor.Executor - Exception in task 3.0 in stage 4.0 (TID 21) 
scala.MatchError: [abc,bcd,null,3] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
at com.matteoguarnerio.spark.SparkOperations$$anonfun$1.apply(SparkOperations.scala:62) ~[classes/:na] 
at com.matteoguarnerio.spark.SparkOperations$$anonfun$1.apply(SparkOperations.scala:62) ~[classes/:na] 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ~[scala-library-2.11.11.jar:na] 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ~[scala-library-2.11.11.jar:na] 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ~[scala-library-2.11.11.jar:na] 
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.RangePartitioner$$anonfun$9.apply(Partitioner.scala:261) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.RangePartitioner$$anonfun$9.apply(Partitioner.scala:259) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.scheduler.Task.run(Task.scala:86) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) ~[spark-core_2.11-2.0.2.jar:2.0.2] 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144] 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144] 
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144] 

Spark-Version: 2.0.2

Scala Version: 2.11.11

  • Wie diese Ausnahme lösen und durchlaufen, auch wenn einige Felder null sind oder in der RDD Anpassung fehlt und die Schaffung von Objekte?
  • Warum ist das Schema, auch wenn ich in einigen Feldern explizit NULL-Werte und NULL-Werte definiert hat, alles NULL-fähig?

UPDATE

habe ich nur eine Abhilfe auf dataRdd das Problem zu vermeiden:

private val dataRdd = inputDF.rdd 
.map { 
    case r: GenericRowWithSchema => { 
     val a = r.getAs("a").asInstanceOf[String] 
     val b = r.getAs("b").asInstanceOf[String] 

     var c: Option[String] = None 
     var d: Option[Double] = None 

     try { 
     c = if (r.isNullAt(r.fieldIndex("c"))) None: Option[String] else Some(r.getAs("c").asInstanceOf[String]) 
     d = if (r.isNullAt(r.fieldIndex("d"))) None: Option[Double] else Some(r.getAs("d").asInstanceOf[Double]) 
     } catch { 
     case _: Throwable => None 
     } 

     MyCaseClass(a, b, c, d) 
    } 
} 

und geändert MyCaseClass auf diese Weise:

case class MyCaseClass(
       a: String, 
       b: String, 
       c: Option[String], 
       d: Option[Double] 
) 

Antwort

0

Das Problem ist, mit input.json. Es sollte so aussehen:

{"a": "abc", "b": "bcd", "d": 3}, 
{"a": "ezx", "b": "hdg", "c": "ssa"}, 
... 

Mit diesem input.json Ihr Code funktioniert gut.

+0

JSON wird bereits auch als Eigenschaftsschlüssel zitiert. Ich habe den Fehler gemacht, die Datei zu löschen, sie ist bereits in dieser Form und das Problem besteht immer noch. –

+0

Wie ich in meiner Antwort erwähnt habe - Ihr Code funktioniert gut! Es gibt kein Problem, es sei denn, in Ihrer Frage fehlt noch etwas. – himanshuIIITian

Verwandte Themen