2016-05-16 3 views
1

Ich möchte große Datei Zeile für Zeile lesen und fügen Sie die Daten in DB speichern. Mein functon gibt ein Promise zurück, in dem es erstellt wurde, und löste es, wenn Ereignis aufgerufen wurde, aber ich möchte es nicht wirklich, weil es in stream.on('data')Promise.map() auf jeder Zeile produziert, und ich möchte sicher sein, dass alle Einfügeoperationen abgeschlossen sind vor resolve() aufgerufen. Wie kann ich in diesem Fall die richtige Kette herstellen?Promise kombinieren Stream

var loadFromFile = (options) => new Promise(function (resolve, reject) { 
     let stream = fs.createReadStream(options.filePath, { 
      flags: 'r', 
     }); 

     stream.on('data', (chunk) => { 
      /* process data chunk to string Array */ 
      Promise.map(lines, (line) => { 
      /* process and insert each line here */ 
      }) 
      .catch((err)=>{ 
       reject(err); 
      }); 
      }); 

      stream.on('end',() => { 
       if (someBuisnessLogicCheckHere) { 
       reject('Invalid data was found in file'); 
       } 
       resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed 
      }); 
    }); 
+1

Sie Code benötigen und diese dann viel klarer zu sein. –

+0

fügen Sie etwas Code, es ist einfach, wie ich früh – Dmitriy

+0

@ Dmitriy beschreiben: Und doch, zitieren Sie es lassen Sie mich Ihre Frage beantworten. Code ist 1024 Wörter wert! –

Antwort

3

Wenn Sie sicher sein wollen Sie für das Versprechen nicht, bis alle der Versprechungen in der Mapping-Operation zu beheben haben beschlossen, warten, dass Promise.map zurückkehrt; siehe Kommentar:

var loadFromFile = (options) => new Promise(function(resolve, reject) { 
    let stream = fs.createReadStream(options.filePath, { 
     flags: 'r', 
    }); 
    let promises = [];         // Array of promises we'll wait for 

    stream.on('data', (chunk) => { 
     promises.push(        // Remember this promise 
      Promise.map(lines, (line) => { 
       /* process and insert each line here */ 
      }) 
      .catch((err) => { 
       reject(err); 
      }) 
     ); 
    }); 

    stream.on('end',() => { 
     if (someBuisnessLogicCheckHere) { 
      reject('Invalid data was found in file'); 
     } 
     Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving 
    }); 
}); 

Bitte beachte, dass ich nicht nur Promise.all(promises).then(resolve); tun, denn das wird eine Reihe von Auflösungen resolve geben, die Ihr ursprünglicher Code nicht mit dem Anrufer nicht geteilt.

Wenn es eine Möglichkeit gibt, dass das Array ziemlich groß wird, während es mit den meisten bereits aufgelösten Versprechungen gefüllt wird, könnten Sie sie proaktiv entfernen, wenn sie auflösen, und nur auf diejenigen warten, die am Ende übrig sind.

+0

danke, ich versuche, das gleiche zu tun, aber eine Frage hier: Ich möchte lesen und Einfügen von Daten gleichzeitig, in diesem Fall Aufträge in Versprechen Array wird warten, bevor die Datei gelesen wurde Ende und Aufruf "Promise.all" func? Ich bin neu in Versprechen, tut mir leid, wenn es dumm Frage – Dmitriy

+0

@ Dmitriy: Nein, sie werden nicht warten. Das Versprechen ist nur ein Benachrichtigungsmechanismus. Die Arbeit innerhalb des Versprechens geschieht unabhängig davon, ob jemand es beobachtet. –

0

Wenn Sie die Datei mit automatisierten Mechanismen der Pause/Fortsetzen in der Pipe-Methode verarbeiten müssen, sehen Sie sich scramjet.

Ihr Beispiel würde viel klarer sehen:

var loadFromFile = (options) => 
    fs.createReadStream(options.filePath, { 
     flags: 'r', 
    }) 
    .split(/\r?\n/) 
     // split by line by regex 
    .parse((line) => aPromiseReturningFunction(line)) 
     // will wait until promises are resolved 
    .map((data) => doSomeAsyncAndReturnPromise(data))  
     // will wait as above 
    .accumulate((acc, resolved) => acc.push(resolved), []) 
    .then((arr) => { 
     if (someBuisnessLogicCheckHere) { 
       return Promise.reject('Invalid data was found in file'); 
     } 
     // Here all your items are saved (i.e. all Promises resolved) 
    }); 
});