1

Ich schreibe eine Spark-Streaming-Anwendung mit Spark 1.6.0 auf einem CDH 5.8.3-Cluster. Die Anwendung ist sehr einfach: Sie liest von Kafka, macht einige Transformationen der DStream/RDDs und gibt sie dann in eine Hive-Tabelle aus. Ich habe auch versucht, einen dummen Beispielcode mit dem sqlContext zu schreiben, aber der Fehler ist immer noch da.Spark Streaming HiveContext NullPointerException

Mein Problem ist, dass ich einen HiveContext in der ForeachRDD-Anweisung des DStream nicht verwenden kann.

Mein Code sieht wie folgt aus:

val sc = new SparkContext() 
val sqlContext = new HiveContext(sc) 
val ssc = new StreamingContext(sc, Minutes(sparkBatchInterval)) 
ssc.checkpoint(CHECKPOINT_DIR) 
ssc.sparkContext.setLogLevel("WARN") 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokersList, "auto.offset.reset" -> "smallest") 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic)) 
val validatedAndPersisted = dstream.transform(rdd => {...}).persist(StorageLevel.MEMORY_AND_DISK_SER) 
    val recordsToBeIngested = ... 
    recordsToBeIngested.foreachRDD(rdd=> { 
    rdd.persist(StorageLevel.MEMORY_AND_DISK) 

    val ingestCount = rdd.count 
    if(ingestCount>0) { 
    sqlContext.tables("sc4").show() //here actually I shoud have a insertInto 
    } 
} 

Und die Fehler, die ich bekommen, ist dieses:

Exception in thread "main" java.lang.NullPointerException 
    at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205) 
    at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554) 
    at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553) 
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:540) 
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:539) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:539) 
    at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:252) 
    at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:239) 
    at org.apache.spark.sql.hive.HiveContext$$anon$2.<init>(HiveContext.scala:459) 
    at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:459) 
    at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:458) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.<init>(HiveContext.scala:475) 
    at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:475) 
    at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:474) 
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.tables(SQLContext.scala:855) 
    at myPackage.Ingestion$$anonfun$createStreamingContext$1.apply(Ingestion.scala:173) 
    at myPackage.Ingestion$$anonfun$createStreamingContext$1.apply(Ingestion.scala:166) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Haben Sie eine Idee haben, über die der Grund für diesen Fehler sein kann, oder wie könnte Ich befestige es?

Danke, Marco

Antwort

1

ich selbst die Antwort gefunden. Das Problem war aufgrund der Tatsache, dass ich den HiveContext vor dem StreamingContext erstellt habe. Durch Verschieben der Erstellung nach der StreamingContext-Erstellung wurde das Problem behoben.