2017-01-24 3 views
2

Ich spiele mit Streams und async/erwarten Funktionalität. Was ich bisher habe ist:warten auf Funktion mit Rückruf

let logRecord = ((record, callback) => { 
    console.log(record); 
    return callback(); 
}); 

let importCSVfromPath = async((csv_path) => { 
    return new Promise(function(resolve, reject) { 
     var parser = parse(); 
     var input = fs.createReadStream(csv_path); 
     var transformer = transform(logRecord, {parallel: 1}); 

     input.on('error', (err) => { 
      reject(err); 
     }); 
     input.on('finish',()=> { 
      resolve(); 
     }); 

     input.pipe(parser).pipe(transformer); 
    }); 
}); 

Jetzt möchte ich LogRecord mit ImportRecord ersetzen. Das Problem ist, dass diese Funktion Funktionen verwenden muss, die bereits Teil des Async-Stacks sind.

let importRecord = async((record) => { 
    ....... 
    await(insertRow(row)); 
}); 

Was ist der richtige Weg, dies zu tun?

+0

Was bietet die "Async" - und "erwarten" -Funktionen hier? – lonesomeday

Antwort

1

Es ist etwas komplizierter als das - node.js Streams sind nicht angepasst (zumindest noch nicht) zu den es7 async/await Methoden.

Wenn Sie dies selbst entwickeln möchten, sollten Sie eine von Readable stream abgeleitete Klasse schreiben. Die Implementierung einer auf Versprechen basierenden Schnittstelle ist eine ziemliche Aufgabe, aber es ist möglich.

Wenn Sie jedoch mit einem permissiven lizenzierten Framework einverstanden sind, werfen Sie einen Blick auf Scramjet. Mit ihm wird der Code wie folgt aussehen (die meisten dem Beispiel wird die CSV-Parsing - Ich werde einen Helfer in der nächsten Version hinzugefügt werden):

fs.createReadStream("file.csv")  // open your file 
    .pipe(new StringStream())   // pass to scramjet 
    .split("\n")      // split by line 
    .parse((line) => line.split(",")) // convert lines to arrays 
    .map(async (line) => {    // run asynchrounous mapping 
     await importRecord(line);  // import log to DB 
     return logRecord(line);  // return some log for the output 
    }) 
    .pipe(process.stdout);    // pipe the output wherever you like 

Ich glaube, es ist genau das, was Sie suchen und es wird Führen Sie Ihre Datensatzimporte parallel aus, während Sie die Ausgabefolge beibehalten.