5

Ich lese eine Datei (300.000 Zeilen) in node.js. Ich möchte Zeilen in Stapeln von 5.000 Zeilen an eine andere Anwendung (Elasticsearch) senden, um sie zu speichern. Also, wenn ich mit dem Lesen von 5.000 Zeilen fertig bin, möchte ich sie über eine API in großen Mengen an Elasticsearch senden, um sie zu speichern, und dann den Rest der Datei lesen und alle 5.000 Zeilen in großen Mengen senden.Wie liest man Zeilen einer Datei mit node.js oder Javascript mit Verzögerung, nicht im nicht blockierenden Verhalten?

Wenn ich will, Java verwenden (oder eine andere Sperr Sprache wie C, C++, Python, etc.) für diese Aufgabe, werde ich etwas tun:

int countLines = 0; 
String bulkString = ""; 
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt"))); 
while ((currentLine = br.readLine()) != null) { 
    countLines++; 
    bulkString += currentLine; 
    if(countLines >= 5000){ 
      //send bulkString to Elasticsearch via APIs 
      countLines = 0; 
      bulkString = ""; 
    } 
} 

Wenn ich will das gleiche tun mit node.js, das will ich tun:

var countLines = 0; 
var bulkString = ""; 
var instream = fs.createReadStream('filePath.txt'); 
var rl = readline.createInterface(instream, outstream); 
rl.on('line', function(line) { 
    if(countLines >= 5000){ 
      //send bulkString to via APIs 
      client.bulk({ 
      index: 'indexName', 
      type: 'type', 
      body: [bulkString] 
      }, function (error, response) { 
      //task is done 
      }); 
      countLines = 0; 
      bulkString = ""; 
    } 
} 

Das Problem mit node.js ist, dass es nicht blockierenden, damit es nicht zum ersten API-Antwort warten, bevor der nächste Senden Stapel von Zeilen. Ich weiß, dass dies als Vorteil für done.js gelten könnte, weil es nicht auf I/O wartet, sondern das Problem ist, dass es zu viele Daten an Elasticsearch sendet. Daher wird die Warteschlange des Elasticsearch voll und es werden Ausnahmen ausgelöst.

Meine Frage ist, wie kann ich die node.js auf die Antwort von der API warten, bevor es die nächsten Zeilen liest oder bevor es den nächsten Stapel von Zeilen an die Elasticsearch sendet.

Ich weiß, ich kann einige Parameter in Elasticsearch setzen, um die Größe der Warteschlange zu erhöhen, aber ich bin daran interessiert, das Verhalten von node.js für dieses Problem zu blockieren. Ich bin mit dem Konzept der Rückrufe vertraut, aber ich kann mir keine Möglichkeit vorstellen, in diesem Szenario Rückrufe zu verwenden, um zu verhindern, dass node.js die Elasticsearch-API im nicht blockierenden Modus aufruft.

Antwort

4

Pierre's Antwort ist korrekt. Ich möchte nur einen Code einreichen, der zeigt, wie wir von dem nicht-blockierenden Konzept der node.js profitieren können, aber gleichzeitig die Elasticsearch nicht mit zu vielen Anfragen gleichzeitig überfordern.

hier ein Pseudo-Code ist, dass Sie den Code, um eine Flexibilität zu geben, indem der Warteschlange Größenbeschränkung verwenden:

var countLines = 0; 
var bulkString = ""; 
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server 
var batchesAlreadyInQueue = 0; 
var instream = fs.createReadStream('filePath.txt'); 
var rl = readline.createInterface(instream, outstream); 
rl.on('line', function(line) { 
    if(countLines >= 5000){ 
      //send bulkString to via APIs 
      client.bulk({ 
      index: 'indexName', 
      type: 'type', 
      body: [bulkString] 
      }, function (error, response) { 
       //task is done 
       batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests 
       rl.resume(); 
      }); 
      if(batchesAlreadyInQueue >= queueSize){ 
       rl.pause(); 
      } 
      countLines = 0; 
      bulkString = ""; 
    } 
} 
2

Verwenden Sie rl.pause() direkt nach Ihrer if und rl.resume() nach Ihrer //task is done.

Beachten Sie, dass Sie nach dem Aufrufen der Pause möglicherweise noch ein paar Zeilenereignisse haben.

+0

Danke, für mich gearbeitet. – Soheil

Verwandte Themen