Ich habe eine Spark-Anwendung, die RDD-Daten in MongoDB schreibt und ich bekomme eine MongoBulkWriteException. Früher habe ich die BulkWrite() Methode von MongoDB Standard-Treiber, , aber ich habe begonnen, die write() Methode aus dem MongoSpark-Treiber.MongoSpark doppelten Schlüsselfehler speichern E11000
Bevor irgendetwas anderes, ich bin mit Apache Spark 1.6.0 und MongoDB 3.2.11.
Diese Ausnahme-Trace:
com.mongodb.MongoBulkWriteException: Bulk write operation error on server
10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000,
message='E11000 duplicate key error collection: collection-test
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}]
Der Code, der es produziert ist:
JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() {
private static final long serialVersionUID = 1L;
@Override
public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception {
Document json = tuple2._2.toBSONDocument();
return json;
}
});
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf()));
Ich habe eine alternative Lösung meinen alten Code verwenden, aber ich möchte mit MongoSpark schreiben.
Ich habe dieses Problem in MongoDB JIRA (https://jira.mongodb.org/browse/SERVER-14322) gesehen, aber ich bin mir nicht sicher, wie kann ich das Problem umgehen.
UPDATE: Ich habe vergessen zu erwähnen, dass der Fehler nicht beim ersten Mal passiert (d. H. Keine Daten auf mongodb, die Sammlung ist leer). Es schlägt fehl, wenn der Job ein zweites Mal ausgeführt wird. Technisch sollte der Fahrer einen Upsert machen, habe ich recht?
Roger das. Ich kann RDDs im Moment nicht auf Datasets umstellen, daher denke ich, dass ich den MongoConnector-Ansatz verwenden werde. Danke für die Klarstellung. – cabreracanal