2017-12-18 3 views
0

Ich versuche, ein Spark Streaming mit Kafka-Anwendung einschließlich Fehlertoleranz zu implementieren. Wenn ich die Anwendung neu starte, liest sie die Nachrichten, die bereits vor dem Neustart gelesen wurden, und meine Berechnungen sind falsch verlaufen. Bitte helfen Sie mir, dieses Problem zu lösen.Spark Streaming Checkpoints lesen nach Fehler

Hier ist der Code in Java geschrieben.

public static JavaStreamingContext createContextFunc() { 

    SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints(); 

    ApplicationConf conf = new ApplicationConf(); 
    String checkpointDir = conf.getCheckpointDirectory(); 

    JavaStreamingContext streamingContext = app.getStreamingContext(checkpointDir); 

    JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext); 

    return streamingContext; 
} 


public static void main(String[] args) throws InterruptedException { 

    String checkpointDir = conf.getCheckpointDirectory(); 

    Function0<JavaStreamingContext> createContextFunc =() -> createContextFunc(); 
    JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc); 

    streamingContext.start(); 
    streamingContext.awaitTermination(); 

} 

public JavaStreamingContext getStreamingContext(String checkpointDir) { 

    ApplicationConf conf = new ApplicationConf(); 
    String appName = conf.getAppName(); 
    String master = conf.getMaster(); 
    int duration = conf.getDuration(); 

    SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master); 
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration)); 
    streamingContext.checkpoint(checkpointDir); 

    return streamingContext; 
} 

public SparkSession getSession() { 

    ApplicationConf conf = new ApplicationConf(); 
    String appName = conf.getAppName(); 
    String hiveConf = conf.getHiveConf(); 
    String thriftConf = conf.getThriftConf(); 
    int shufflePartitions = conf.getShuffle(); 

    SparkSession spark = SparkSession 
      .builder() 
      .appName(appName) 
      .config("spark.sql.warehouse.dir", hiveConf) 
      .config("hive.metastore.uris", thriftConf) 
      .enableHiveSupport() 
      .getOrCreate(); 

    spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions); 
    return spark; 

} 


public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) { 

    KafkaConfig kafkaConfig = new KafkaConfig(); 
    Set<String> topicsSet = kafkaConfig.getTopicSet(); 
    Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams(); 

    // Create direct kafka stream with brokers and topics 
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
      streamingContext, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.Subscribe(topicsSet, kafkaParams)); 

    JavaDStream<String> logdata = messages.map(ConsumerRecord::value); 

    return logdata; 
} 

Hier ist der Link zum Github-Projekt.

Antwort

0

Ich habe das Problem durch Hinzufügen der folgenden Konfiguration im Code behoben.

sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true")