2017-02-16 3 views
1

Wir erstellen einfache Streaming-Anwendung, die HBase RDD verwendet, um mit eingehenden DStream beizutreten. Beispielcode:Apache Spark: NPE beim Wiederherstellen des Status von Prüfpunkt

val indexState = sc.newAPIHadoopRDD(
    conf, 
    classOf[TableInputFormat], 
    classOf[ImmutableBytesWritable], 
    classOf[Result]).map { case (rowkey, v) => //some logic} 

val result = dStream.transform { rdd => 
    rdd.leftOuterJoin(indexState) 
} 

Es funktioniert gut, aber wenn wir von Prüfpunkten für den Streaming ermöglichen und lassen Sie die Anwendung von einem zuvor erstellten Kontrollpunkt zu erholen, es wirft immer Nullpointer.

ERROR streaming.StreamingContext: Error starting the context, marking it as stopped 
java.lang.NullPointerException 
     at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) 
     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 

Hat jemand die gleichen Probleme konfrontiert? Versionen:

  • Funken 1.6.x
  • Hadoop 2.7.x

Dank!

+0

Wenn Sie sagen, ‚vorher Kontrollpunkt erstellt‘ ist, dass der Streaming-Job bedeuten wurde gestoppt und erneut eingereicht? – ImDarrenG

Antwort

1

Spark-Streaming-Checkpoints können nicht zur Wiederherstellung von früheren Jobs verwendet werden, zumindest nicht in Version 1.6.x. Wenn Ihr Job angehalten und erneut gesendet wird, können die Prüfpunktdaten nicht erneut verwendet werden. Sie müssen alle alten Prüfpunktdaten vor dem Senden des Jobs löschen.

[R] von früheren Checkpoint-Informationen des vor der Aktualisierung Code kann nicht getan werden. Die Checkpoint-Information enthält im Wesentlichen serialisierte Scala/Java/Python-Objekte und der Versuch, Objekte mit neuen, modifizierten Klassen zu deserialisieren, kann zu Fehlern führen. Starten Sie in diesem Fall die aktualisierte App mit einem anderen Prüfpunktverzeichnis oder löschen Sie das vorherige Prüfpunktverzeichnis.

Upgrading the code - checkpointing

+0

Bedeutet es, dass Checkpoints nur für dstream funktionieren und wir sie nicht verwenden können, wenn wir mit irgendeiner Seite von rdd arbeiten? –

+0

Ihre Verwendung ist in Ordnung, aber Checkpointing ermöglicht die Wiederherstellung des Treibers. Es unterstützt nicht das Stoppen und Starten des gesamten Streaming-Jobs über Spark-Submit. – ImDarrenG

+0

* Ich fand, dass das gleiche auch beim Neustart des Streaming-Jobs ohne Code-Änderungen gehalten wurde * Das ist falsch. Es ist kein Problem, einen fehlgeschlagenen Job mit vorhandenen Daten neu zu starten, solange keine Änderungen vorgenommen wurden. –

Verwandte Themen