2017-11-22 1 views
2

Ich habe eine Java-Anwendung, die einen Kafka-Stream von avro-Nachrichten verarbeitet und für jede Nachricht eine Abfrage in einer mongoDB-Sammlung ausführt.mongoDB & Spark: "com.mongodb.MongoSocketReadException: Vorzeitiges Ende des Streams erreicht"

Nachdem einige Dutzend Nachrichten ordnungsgemäß verarbeitet wurden, wird die Anwendung nicht mehr ausgeführt und "com.mongodb.MongoSocketReadException: Vorzeitiges Ende des Streams erreicht" wird ausgelöst.

Hier ist der Code:

JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, 
      String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics); 

    directKafkaStream.foreachRDD(rdd ->{ 

     rdd.foreach(avroRecord -> { 

      byte[] encodedAvroData = avroRecord._2; 
      LocationType t = deserialize(encodedAvroData); 

      MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder(); 
      options_builder.maxConnectionIdleTime(60000); 
      MongoClientOptions options = options_builder.build(); 
      MongoClient mongo = new MongoClient ("localhost:27017", options); 

      MongoDatabase database = mongo.getDatabase("DB"); 
      MongoCollection<Document> collection = database.getCollection("collection"); 

      Document myDoc = collection.find(eq("key", 4)).first(); 
      System.out.println(myDoc); 

     }); 
    }); 

Antwort

0

Zuerst Sie keine Mongo Verbindung für jeder Datensatz öffnen sollte! Dann solltest du deine Mongo-Verbindung schließen.

Mongo mag nicht, wenn Sie viele (Hunderte, Tausende?) Öffnen, ohne sie zu schließen. Hier

ist ein exemple von dem, was Sie tun können, Mongo-Verbindung über eine RDD zu öffnen:

directKafkaStream.foreachRDD(rdd ->{ 
    rdd.foreachPartition(it -> { 

     // Opens only 1 connection per partition 
     MongoClient mongo = new MongoClient ("localhost:27017"); 
     MongoDatabase database = mongo.getDatabase("DB"); 
     MongoCollection<Document> collection = database.getCollection("collection"); 

     while (it.hasNext()) { 
      byte[] encodedAvroData = it.next()._2; 
      LocationType t = deserialize(encodedAvroData); 

      Document myDoc = collection.find(eq("key", 4)).first(); 
      System.out.println(myDoc); 
     } 

     mongo.close(); 
    }); 
}); 
Verwandte Themen