2017-08-22 3 views
1

Ich habe einen Code, der aussieht wie untenjava.lang.NoClassDefFoundError: Es konnte keine Klasse initialisieren, wenn Funken Job über Start Funken einreichen in scala Code

 
object ErrorTest { 
case class APIResults(status:String, col_1:Long, col_2:Double, ...) 

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = { 
    //call some API ang get results and return APIResults 
    ... 
} 

// MARK: load properties 
val props = loadProperties() 
private def loadProperties(): Properties = { 
    val configFile = new File("config.properties") 
    val reader = new FileReader(configFile) 
    val props = new Properties() 
    props.load(reader) 
    props 
} 

def main(args: Array[String]): Unit = { 
    val prop_a = props.getProperty("prop_a") 

    val session = Context.initialSparkSession(); 
    import session.implicits._ 

    val initialSet = ArrayBuffer.empty[Row] 
    val addToSet = (s: ArrayBuffer[Row], v: Row) => (s += v) 
    val mergePartitionSets = (p1: ArrayBuffer[Row], p2: ArrayBuffer[Row]) => (p1 ++= p2) 

    val sql1 = 
    s""" 
     select * from tbl_a where ... 
    """ 

    session.sql(sql1) 
    .rdd.map{row => {implicit val formats = DefaultFormats; (row.getLong(6), row)}} 
    .aggregateByKey(initialSet)(addToSet,mergePartitionSets) 
    .repartition(40) 
    .map{case (rowNumber,rows) => {implicit val formats = DefaultFormats; funcA(rows)}} 
    .flatMap(x => x) 
    .toDF() 
    .write.mode(SaveMode.Overwrite).saveAsTable("tbl_b") 
    } 
} 

, wenn ich es über spark-submit laufen, wirft es Fehler Verursacht von: java.lang.NoClassDefFoundError: Die Klasse staging_jobs.ErrorTest $ konnte nicht initialisiert werden. Aber wenn ich val props = loadProperties() in die erste Zeile der main Methode verschiebe, dann gibt es keinen Fehler mehr. Könnte mir jemand eine Erklärung zu diesem Phänomen geben? Danke vielmals!

Caused by: java.lang.NoClassDefFoundError: Could not initialize class staging_jobs.ErrorTest$ 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    ... 8 more 

Antwort

0

Ich habe die gleiche Frage wie Sie getroffen. Ich habe eine Methode convert außerhalb main Methode definiert. Wenn ich es mit dataframe.rdd.map{x => convert(x)} in main benutze, passiert NoClassDefFoundError:Could not initialize class Test$ passiert.

Aber wenn ich ein Funktionsobjekt convertor, das ist der gleiche Code mit convert Methode, in main Methode, ist kein Fehler passiert.

Ich verwendete funken 2.1.0, scala 2.11, es scheint wie ein Fehler in Funken?

0

Ich schätze das Problem ist, dass val props = loadProperties() ein Mitglied für die äußere Klasse (von Main) definiert. Dann wird dieses Mitglied auf den Executoren serialisiert (oder ausgeführt), die nicht die sichere Umgebung mit dem Treiber haben.

Verwandte Themen