2016-04-12 10 views
8

Gibt es eine Möglichkeit, einen lesbaren Stream mit einem schreibbaren Datenstrom in Node.js zu verbinden, in dem der Schreibzugriff noch nicht bereit ist, Daten zu empfangen? Mit anderen Worten, ich möchte das Lesbare mit dem Schreibbaren verbinden, aber ich möchte das Schreibfähige initialisieren, einschließlich der Definition der Schreibmethode, zu einem späteren Zeitpunkt im Programm. Vielleicht müssen wir die Write-Methode implementieren, aber gibt es eine Möglichkeit, einen schreibbaren Stream in ähnlicher Weise anzuhalten, in der Sie einen lesbaren Stream pausieren können? Oder vielleicht können wir einen intermediären Through-/Transform-Stream verwenden und die Daten dort puffern, bevor wir die Daten an das Schreibfähige weiterleiten!Piping von Daten in schreibbaren Datenstrom, der noch nicht bereit ist, Daten zu empfangen

Als ein Beispiel, in der Regel tun wir:

readable.pipe(transform).pipe(writable); 

aber ich möchte, wie etwas tun:

const tstrm = readable.pipe(transform); 

doSomethingAsync().then(function(){ 

     tstrm.pipe(writable); 

}); 

nur fragen, ob dies möglich ist und wie man es richtig zu tun, so weit mit Probleme herauszufinden, beides.

Ich denke, ich suche die Daten in einem intermediären Transformations-Stream zu puffern, bevor es mit einem schreibbaren Stream verbunden wird und anschließend, sobald es verbunden ist, die gepufferten Daten zuerst vor neuen Daten streamen. Scheint wie eine vernünftige Sache zu tun, kann keine Informationen dazu finden.

+0

sieht aus wie die akzeptierte Antwort hier ist in der Nähe zu dem, was ich für http://stackoverflow.com/questions/20317759/implementing-a-buffered-transform-stream –

Antwort

2

Hinweis, ich verwende hier ein Intervall, um nachzuahmen, ob der Schreiber lesen kann oder nicht. Sie können dies tun wie du also wollen, wenn der Schreiber false zurückgibt Sie den Status aktualisieren würde usw. zu beginnen Pufferung ich denke, die letzte Zeile ist, was Sie also

r.pipe(b).pipe(w); 

Dies wollen liest als

readStrem.pipe(transformBbuffer).pipe(writeStream); 
folgt

Der Beispielcode, es gibt einige Änderungen, die wir machen können, um alle Daten zu puffern. Ich werde nach dem Code beschreiben. Alles, was Sie über Ströme wissen müssen und mehr sind in der Dokumentation, ich glaube, sie mit vollständigeren Beispiele tun könnte, aber sie sind ziemlich gut wie ...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

Dies ist der Code.

var fs  = require('fs'); 
var stream = require('stream') 
const util = require('util'); 
//const StringDecoder = require('string_decoder').StringDecoder; 
const Transform = require('stream').Transform; 
var check_buff = 0; 
var DRAIN_ME = 0; 

var r = fs.createReadStream('file1.txt').setEncoding('utf8'); 
var w = fs.createWriteStream('file2.txt'); 

var BufferStream = function() { 
    stream.Transform.apply(this, arguments); 
    this.buffer = []; 
}; 

util.inherits(BufferStream, stream.Transform); 

var intId; 
intId = setInterval(function(){ 
    if(check_buff % 3 == 0) { 
    DRAIN_ME = 1; 
    return; 
    } 
    DRAIN_ME = 0; 
},10); 

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 
    while(DRAIN_ME > 0 && this.buffer.length > 0) { 
    this.push(this.buffer.shift()); 
    } 
    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 

var b = new BufferStream(); 
b.on('end', function(chunk) { 
    clearInterval(intId); 
}); 
r.pipe(b).pipe(w); 

ich für die kanonische Weise suchen a/durch Strom zu implementieren verwandeln, die alle Daten puffert, bis Rohr Anruf ist.

die folgenden Änderungen

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 

    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 
...... 
BufferStream.prototype._flush = function (cb) { 
    var len = this.buffer.length; 
    for (var i = 0; i < len; i++) { 
    this.push(this.buffer.shift()); 
    }; 
    cb(); 
}; 

Sie können auch den lesbaren Stream anhalten, die in der Tat den beschreibbaren Strom unterbrechen, weil es den Empfang von Daten, dh stoppt ...

dies einen Um zu testen, erstellen ziemlich große Datei auf der Festplatte, dh 100 MB oder mehr und führen Sie diese ...

Der Grund für die sofortige Pause ist denn zu dem Zeitpunkt, zu dem das Intervall 10ms gefeuert hat, wurde die Datei möglicherweise bereits geschrieben. Es gibt Variationen dazu, dh ...

var fs = require('fs'); 
var readableStream = fs.createReadStream('file1.txt'); 
var writableStream = fs.createWriteStream('file2.txt'); 
readableStream.setEncoding('utf8'); 

var ready = 0; 
setInterval(function(){ 
    if(ready == 0) { 
    //console.log('pausing'); 
    readableStream.pause(); 
    ready = 1; 
    } 
    else { 
    //console.log('resuming'); 
    readableStream.resume(); 
    ready = 0; 
    } 
},100); 

readableStream.on('data', function(chunk) { 
    writableStream.write(chunk); 
    readableStream.pause(); 
}); 
+0

Dank bin auf der Suche ja ich die lesbaren pausieren kann, das ist nicht so schwer, aber was ist mit der Verwendung eines Through-/Transform-Streams, um die Daten zu puffern, während wir darauf warten, dass das Schreibprogramm für den Datenempfang bereit ist? –

Verwandte Themen