2013-07-04 8 views
17

anlegen würde Ich mag in einer zwei Node.js Ströme kombinieren, indem sie kochend, wenn möglich. Ich verwende Transform Streams.einen Node.js Strom aus zwei verrohrt Ströme

Mit anderen Worten, würde ich meine Bibliothek wie myStream zurückzukehren für Menschen zu verwenden. Zum Beispiel könnten sie schreiben:

process.stdin.pipe(myStream).pipe(process.stdout); 

Und intern verwende ich eine Drittanbieter-vendorStream, die etwas Arbeit tut, eingesteckt in meine eigene Logik in myInternalStream enthalten. Also, was oben wäre würde übersetzen in:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout); 

Kann ich so etwas tun? Ich habe versucht var myStream = vendorStream.pipe(myInternalStream), aber das funktioniert offensichtlich nicht.

Um eine Analogie zu machen mit bash, lassen Sie uns sagen, dass ich ein Programm schreiben möchten, wenn der Brief überprüft h in der letzten Zeile von einigen Strom vorhanden ist (tail -n 1 | grep h), kann ich ein Shell-Skript erstellen:

# myscript.sh 
tail -n 1 | grep h 

Und dann, wenn die Menschen tun:

$ printf "abc\ndef\nghi" | . myscript.sh 

Es funktioniert einfach.

Dies ist, was ich bisher:

// Combine a pipe of two streams into one stream 

var util = require('util') 
    , Transform = require('stream').Transform; 

var chunks1 = []; 
var stream1 = new Transform(); 
var soFar = ''; 
stream1._transform = function(chunk, encoding, done) { 
    chunks1.push(chunk.toString()); 
    var pieces = (soFar + chunk).split('\n'); 
    soFar = pieces.pop(); 
    for (var i = 0; i < pieces.length; i++) { 
    var piece = pieces[i]; 
    this.push(piece); 
    } 
    return done(); 
}; 

var chunks2 = []; 
var count = 0; 
var stream2 = new Transform(); 
stream2._transform = function(chunk, encoding, done) { 
    chunks2.push(chunk.toString()); 
    count = count + 1; 
    this.push(count + ' ' + chunk.toString() + '\n'); 
    done(); 
}; 

var stdin = process.stdin; 
var stdout = process.stdout; 

process.on('exit', function() { 
    console.error('chunks1: ' + JSON.stringify(chunks1)); 
    console.error('chunks2: ' + JSON.stringify(chunks2)); 
}); 
process.stdout.on('error', process.exit); 


// stdin.pipe(stream1).pipe(stream2).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

// Best working solution I could find 
var stream3 = function(src) { 
    return src.pipe(stream1).pipe(stream2); 
}; 
stream3(stdin).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

Ist das überhaupt möglich? Lass mich wissen, wenn das, was ich versuche, nicht klar ist.

Danke!

Antwort

25

Sie etwas sehen können zu Ihrem Strom geleitet werden und dann unpipe es und Rohr es zu den Streams Sie Interesse an:

var PassThrough = require('stream').PassThrough; 

var stream3 = new PassThrough(); 

// When a source stream is piped to us, undo that pipe, and save 
// off the source stream piped into our internally managed streams. 
stream3.on('pipe', function(source) { 
    source.unpipe(this); 
    this.transformStream = source.pipe(stream1).pipe(stream2); 
}); 

// When we're piped to another stream, instead pipe our internal 
// transform stream to that destination. 
stream3.pipe = function(destination, options) { 
    return this.transformStream.pipe(destination, options); 
}; 

stdin.pipe(stream3).pipe(stdout); 

Sie diese Funktionalität in Ihre eigene konstruierbar Stream Klasse extrahieren :

var util = require('util'); 
var PassThrough = require('stream').PassThrough; 

var StreamCombiner = function() { 
    this.streams = Array.prototype.slice.apply(arguments); 

    this.on('pipe', function(source) { 
    source.unpipe(this); 
    for(i in this.streams) { 
     source = source.pipe(this.streams[i]); 
    } 
    this.transformStream = source; 
    }); 
}; 

util.inherits(StreamCombiner, PassThrough); 

StreamCombiner.prototype.pipe = function(dest, options) { 
    return this.transformStream.pipe(dest, options); 
}; 

var stream3 = new StreamCombiner(stream1, stream2); 
stdin.pipe(stream3).pipe(stdout); 
+0

Vielen Dank @brandon, das ist genial! Aktualisiert meinen Gist https://gist.github.com/nicolashery/5910969 –

+0

Das ist genial. Ich dachte darüber nach, etwas Ähnliches zu tun, aber ich hatte einfach kein Selbstvertrauen, dass mir die Subtilität fehlte, die meine Lösung falsch machen würde. Vielen Dank für das Vertrauen – FellowMD

+0

FWIW, für diese Lösung zu arbeiten, sind Sie zu Rohr stream3 erforderlich ist, um eine Quelle (in diesem Fall stdin), bevor sie an stdout kochend. Also, kein stream3.pipe (stdout); stream3.write (Daten); Aber das ist eine große Hilfe! Vielen Dank! –

2

eine Option ist vielleicht multipipe zu verwenden, die zusammen Sie verketten mehrere Transformationen können als einzelne Transformationsstrom eingewickelt:

// my-stream.js 
var multipipe = require('multipipe'); 

module.exports = function createMyStream() { 
    return multipipe(vendorStream, myInternalStream); 
}; 

Dann können Sie tun:

var createMyStream = require('./my-stream'); 

var myStream = createMyStream(); 

process.stdin.pipe(myStream).pipe(process.stdout);