Ich habe eine Java-Anwendung, die einen Kafka-Stream von avro-Nachrichten verarbeitet und für jede Nachricht eine Abfrage in einer mongoDB-Sammlung ausführt.mongoDB & Spark: "com.mongodb.MongoSocketReadException: Vorzeitiges Ende des Streams erreicht"
Nachdem einige Dutzend Nachrichten ordnungsgemäß verarbeitet wurden, wird die Anwendung nicht mehr ausgeführt und "com.mongodb.MongoSocketReadException: Vorzeitiges Ende des Streams erreicht" wird ausgelöst.
Hier ist der Code:
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
LocationType t = deserialize(encodedAvroData);
MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
options_builder.maxConnectionIdleTime(60000);
MongoClientOptions options = options_builder.build();
MongoClient mongo = new MongoClient ("localhost:27017", options);
MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");
Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);
});
});