2017-08-31 6 views
2

Ich versuche, unten mit dem Code eine Datensammlung, um mehrere Dateien zu streamen:Node - Lesbare Stromrohr() vorherige Ströme in einer for-Schleife überschreiben

for (var key in data) { 
    // skip if collection length is 0 
    if (data[key].length > 0) { 
    // Use the key and jobId to open file for appending 
    let filePath = folderPath + '/' + key + '_' + jobId + '.txt'; 

    // Using stream to append the data output to file, which should perform better when file gets big 
    let rs = new Readable(); 
    let n = data[key].length; 
    let i = 0; 

    rs._read = function() { 
     rs.push(data[key][i++]); 

     if (i === n) { 
     rs.push(null); 
     } 
    }; 

    rs.pipe(fs.createWriteStream(filePath, {flags: 'a', encoding: 'utf-8'})); 

    } 
} 

Allerdings habe ich am Ende alle Dateien immer sein bevölkerten mit den gleichen Daten, die das Array für den letzten Schlüssel in data Objekt ist. Es scheint, dass der Lesestream für jede Schleife außer Kraft gesetzt wird, und der pipe() zum schreibbaren Datenstrom startet erst, wenn die for-Schleife beendet ist. Wie ist das möglich?

+0

für var in der ersten Schleife zu ändern versuchen – Kieper

Antwort

1

Also der Grund, warum Sie Code ist wahrscheinlich nicht funktioniert, weil rs._read-Methode asynchron aufgerufen wird, und Ihre Schlüsselvariable ist Funktionsbereich (wegen var-Schlüsselwort).

Jeder von Ihnen erstellte rs-Stream verweist auf dieselbe Variable, die der Schlüssel ist. Am Ende der Hauptschleife hat jeder dieser Callbacks den gleichen Wert. Wenn Sie "var" in "let" ändern, wird in jeder Iteration eine neue Schlüsselvariable erstellt, die Ihr Problem löst (die _read-Funktion hat eine eigene Kopie der Schlüsselvariablen statt der gemeinsamen).

Wenn Sie es ändern, damit es funktionieren sollte.

+0

Dank zu lassen, habe ich frage mich schon immer Was ist der Unterschied zwischen var und let. Das war in der Tat das Problem. Es schien kein Problem zu sein, wenn blockierender Code synchron ausgeführt wurde, bis in diese Situation hineingelaufen ist. Es sieht also so aus, als würden alle Callbacks warten, bis die Schleife abgeschlossen ist. – StephenK

0

Dies passiert, weil die key, die Sie in der Schleifenanweisung definieren, nicht blockübergreifend ist. Dies ist zunächst kein Problem, aber wenn Sie innerhalb der rs._read-Funktion einen Abschluss erstellen, verwenden alle nachfolgenden Datenstromlesevorgänge den letzten bekannten Wert, der der letzte Wert des Arrays data ist.

Und wenn wir schon dabei, ich habe ein bisschen ein Refactoring vorschlagen kann, um den Code sauberer zu machen und wiederverwendbar:

const writeStream = (folderPath, index, jobId) => { 
    const filePath = `${folderPath}/${index}_${jobId}.txt`; 

    return fs.createWriteStream(filePath, { 
     flags: 'a', encoding: 'utf-8' 
    }); 
} 

data.forEach((value, index) => { 
    const length = value.length; 

    if (length > 0) { 
     const rs = new Readable(); 
     const n = length; 

     let i = 0; 

     rs._read =() => { 
      rs.push(value[i++]); 
      if (i === n) rs.push(null); 
     } 

     rs.pipe(writeStream(folderPath, index, jobId)); 
    } 
}); 
+0

Danke. Ein Neuling zum Knoten, fühle ich mich immer noch um die Rückrufe und Grammatiken herum. Wie werden die Lese- und Schreib-Stream-Objekte in diesem Async-Szenario gesammelt? Würde das Ende des Stream-Zeichens (Null) das Schließen dieser Objekte auslösen? Wenn ich beispielsweise diese Ereignisse so verketten möchte, wird die Dateikopierfunktion nicht ausgeführt, bis alle Dateien geschrieben sind. Wie erreiche ich das? – StephenK

Verwandte Themen