2017-03-07 3 views
3

Ich habe eine Spark-Anwendung, die RDD-Daten in MongoDB schreibt und ich bekomme eine MongoBulkWriteException. Früher habe ich die BulkWrite() Methode von MongoDB Standard-Treiber, , aber ich habe begonnen, die write() Methode aus dem MongoSpark-Treiber.MongoSpark doppelten Schlüsselfehler speichern E11000

Bevor irgendetwas anderes, ich bin mit Apache Spark 1.6.0 und MongoDB 3.2.11.

Diese Ausnahme-Trace:

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 
10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000, 
message='E11000 duplicate key error collection: collection-test 
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}] 

Der Code, der es produziert ist:

JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() { 
private static final long serialVersionUID = 1L; 
    @Override 
    public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception { 
      Document json = tuple2._2.toBSONDocument(); 
      return json; 
     } 
}); 
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf())); 

Ich habe eine alternative Lösung meinen alten Code verwenden, aber ich möchte mit MongoSpark schreiben.

Ich habe dieses Problem in MongoDB JIRA (https://jira.mongodb.org/browse/SERVER-14322) gesehen, aber ich bin mir nicht sicher, wie kann ich das Problem umgehen.

UPDATE: Ich habe vergessen zu erwähnen, dass der Fehler nicht beim ersten Mal passiert (d. H. Keine Daten auf mongodb, die Sammlung ist leer). Es schlägt fehl, wenn der Job ein zweites Mal ausgeführt wird. Technisch sollte der Fahrer einen Upsert machen, habe ich recht?

Antwort

2

Der Spark-Connector weiß nicht, wie man RDD<T> updaten kann, wobei T jeder Typ sein kann - wie kann es den ID-Wert bekommen?

Datensätze/DataFrames enthalten jedoch Schemainformationen, die angeben, welches Feld das Feld _id ist, und können automatisch für Upserts verwendet werden. Dies wurde in SPARK-66 getan. Ein weiterer Vorteil von Datasets/DataFrames ist, dass sie effizienter sind und Ihren Spark-Jobs einen Leistungsschub verleihen.

Wenn Sie RDDs verwenden müssen, können Sie programmatisch auf die MongoDB-Sammlung zugreifen und eine Upsert-Operation über die Klasse MongoConnector erstellen.

+0

Roger das. Ich kann RDDs im Moment nicht auf Datasets umstellen, daher denke ich, dass ich den MongoConnector-Ansatz verwenden werde. Danke für die Klarstellung. – cabreracanal

Verwandte Themen