2016-04-16 10 views
2

Verwenden des elastischen Such-Clients NodeJS. Der Versuch, einen Datenimport zu schreiben, um Dokumente aus MongoDB zu importieren. Das Problem, das ich habe, ist die Indexaktualisierung scheint nicht zu warten, bis alle Dokumente in elastic geschrieben werden, bevor die Zählungen überprüft werden.So warten Sie, bis alle Bulk-Schreibvorgänge in der elastischen Such-API abgeschlossen sind

Verwenden Sie die Stream-API im Knoten, um die Datensätze in einen Stapel zu lesen, und verwenden Sie dann den elastischen API-Massenbefehl, um die Datensätze zu schreiben. Im Folgenden dargestellt:

function rebuildIndex(modelName, queryStream, openStream, done) { 
    logger.debug('Rebuilding %s index', modelName); 
    async.series([ 
     function (next) { 
      deleteType(modelName, function (err, result) { 
      next(err, result); 
      }); 
     }, 
     function (next) { 
      var Model; 
      var i = 0; 
      var batchSize = settings.indexBatchSize; 
      var batch = []; 
      var stream; 

      if (queryStream && !openStream) { 
      stream = queryStream.stream(); 
      } else if (queryStream && openStream) { 
      stream = queryStream; 
      }else 
      { 
      Model = mongoose.model(modelName); 
      stream = Model.find({}).stream(); 
      } 

      stream.on("data", function (doc) { 
      logger.debug('indexing %s', doc.userType); 
      batch.push({ 
       index: { 
       "_index": settings.index, 
       "_type": modelName.toLowerCase(), 
       "_id": doc._id.toString() 
       } 
      }); 
      var obj; 
      if (doc.toObject){ 
       obj = doc.toObject(); 
      }else{ 
       obj = doc; 
      } 
      obj = _.clone(obj); 

      delete obj._id; 
      batch.push(obj); 
      i++; 
      if (i % batchSize == 0) { 
       console.log(chalk.green('Loaded %s records'), i); 
       client().bulk({ 
       body: batch 
       }, function (err, resp) { 
       if (err) { 
        next(err); 
       } else if (resp.errors) { 
        next(resp); 
       } 
       }); 
       batch = []; 
      } 
      }); 

      // When the stream ends write the remaining records 
      stream.on("end", function() { 
      if (batch.length > 0) { 
       console.log(chalk.green('Loaded %s records'), batch.length/2); 
       client().bulk({ 
       body: batch 
       }, function (err, resp) { 
       if (err) { 
        logger.error(err, 'Failed to rebuild index'); 
        next(err); 
       } else if (resp.errors) { 
        logger.error(resp.errors, 'Failed to rebuild index'); 
        next(resp); 
       } else { 
        logger.debug('Completed rebuild of %s index', modelName); 
        next(); 
       } 
       }); 
      } else { 
       next(); 
      } 

      batch = []; 
      }) 
     } 

     ], 
     function (err) { 
     if (err) 
      logger.error(err); 
     done(err); 
     } 
    ); 
    } 

Ich benutze diesen Helfer, um die Anzahl der Dokumente im Index zu überprüfen. Ohne die Zeitüberschreitung sind die Zählungen im Index falsch, aber mit der Zeitüberschreitung sind sie in Ordnung.

/** 
    * A helper function to count the number of documents in the search index for a particular type. 
    * @param type The type, e.g. User, Customer etc. 
    * @param done A callback to report the count. 
    */ 
    function checkCount(type, done) { 
    async.series([ 
     function(next){ 
     setTimeout(next, 1500); 
     }, 
     function (next) { 
     refreshIndex(next); 
     }, 
     function (next) { 
     client().count({ 
      "index": settings.index, 
      "type": type.toLowerCase(), 
      "ignore": [404] 
     }, function (error, count) { 
      if (error) { 
      next(error); 
      } else { 
      next(error, count.count); 
      } 
     }); 
     } 
    ], function (err, count) { 
     if (err) 
     logger.error({"err": err}, "Could not check index counts."); 
     done(err, count[2]); 
    }); 
    } 

Und dieser Helfer ist soll den Index aktualisieren, nachdem das Update abgeschlossen ist:

// required to get results to show up immediately in tests. Otherwise there's a 1 second delay 
    // between adding an entry and it showing up in a search. 
    function refreshIndex(done) { 
    client().indices.refresh({ 
     "index": settings.index, 
     "ignore": [404] 
    }, function (error, response) { 
     if (error) { 
     done(error); 
     } else { 
     logger.debug("deleted index"); 
     done(); 
     } 
    }); 
    } 

Der Lader in Ordnung funktioniert, außer dieser Test nicht zwischen der Masse Last wegen des Timings und der Zählerprüfung :

it('should be able to rebuild and reindex customer data', function (done) { 
    this.timeout(0); // otherwise the stream reports a timeout error 
    logger.debug("Testing the customer reindexing process"); 

    // pass null to use the generic find all query 
    searchUtils.rebuildIndex("Customer", queryStream, false, function() { 
     searchUtils.checkCount("Customer", function (err, count) { 
     th.checkSystemErrors(err, count); 
     count.should.equal(volume.totalCustomers); 
     done(); 
     }) 
    }); 
    }); 

Ich beobachte zufällige Ergebnisse in den Zählungen aus den Tests. Mit der künstlichen Verzögerung (setTimeout in der checkCount-Funktion) stimmen die Zählwerte überein. So schließe ich, dass die Dokumente sind schließlich geschrieben, um elastisch und der Test würde bestanden. Ich dachte, die indices.refresh würde im Wesentlichen eine Wartezeit erzwingen, bis die Dokumente alle in den Index geschrieben sind, aber es scheint nicht mit diesem Ansatz zu arbeiten.

Der setTimeout-Hack ist nicht wirklich nachhaltig, wenn der Datenträger auf die tatsächliche Produktionsebene geht .... wie kann ich sicherstellen, dass die Massenaufrufe vollständig in den elastischen Index geschrieben werden, bevor die Anzahl der Dokumente überprüft wird?

+0

Sind Sie die Methoden (Refresh und zählen) sicher sind [sequenziell ausgeführt] (http: //stackoverflow.com/questions/15969082/node-js-async-series-is-that-how-it-is-supposed-to-work) und nicht gleichzeitig? –

+0

Hallo, Ja, ich bin ziemlich sicher, dass es in Ordnung ist. Der dritte Codeblock oben zeigt die Verwendung von 'async.series'. Ich übergebe die interne nächste Funktion und benutze das als Rückruf. Die Frage, die Sie mit dem Typ verknüpften, definierte den Rückruf neu und wertete die Funktion sofort aus. Für meine ist die Funktionsdefinition nur 'inline', anstatt eine externe Funktion aufzurufen. –

Antwort

1

Werfen Sie einen Blick auf die "refresh" Parameter (elasticsearch documentation)

Zum Beispiel:

let bulkUpdatesBody = [ bulk actions/docs to index go here ] 
client.bulk({ 
    refresh: "wait_for", 
    body: bulkUpdatesBody 
}); 
+0

Ich habe mich gefragt, warum _stats nach der Verwendung der Bulk-API ein JSON mit einer falschen Anzahl für den Index, den ich gerade erstellt habe, zurückgegeben hat. Dies half mir, mein Problem zu lösen, nicht sicher, ob es die ursprüngliche Frage hilft. – William

0

Ich bin mir nicht sicher, ob das die Antwort ist oder nicht - aber ich habe den Index vor der Zählung geleert. Es "scheint" zu funktionieren, aber ich weiß nicht, ob es nur wegen des Timings zwischen den Anrufen ist. Vielleicht weiß jemand vom elastischen Team, ob das Löschen des Index wirklich das Problem lösen wird?

function checkCount(type, done) { 
    async.series([ 
     function(next) { 
     client().indices.flush({ 
      "index": settings.index, 
      "ignore": [404] 
     }, function (error, count) { 
      if (error) { 
      next(error); 
      } else { 
      next(error, count.count); 
      } 
     }); 
     }, 
     function (next) { 
     refreshIndex(type, next); 
     }, 
     function (next) { 
     client().count({ 
      "index": settings.index, 
      "type": type.toLowerCase(), 
      "ignore": [404] 
     }, function (error, count) { 
      if (error) { 
      next(error); 
      } else { 
      next(error, count.count); 
      } 
     }); 
     } 
    ], function (err, count) { 
     if (err) 
     logger.error({"err": err}, "Could not check index counts."); 
     done(err, count[2]); 
    }); 
    } 
Verwandte Themen