2017-02-20 1 views
0

Ich bin vertraut mit Node-Streams, aber ich habe Probleme mit Best Practices für die Abstraktion von Code, die ich oft in einem einzigen Pipe-Schritt wiederverwende.Knoten - Abstrahieren von Pipe-Schritten in Funktion

Hier ist eine abgespeckte Version von dem, was ich heute schreibe:

inputStream 
.pipe(csv.parse({columns:true}) 
.pipe(csv.transform(function(row) {return transform(row); })) 
.pipe(csv.stringify({header: true}) 
.pipe(outputStream); 

Die eigentliche Arbeit geschieht in transform(). Die einzigen Dinge, die sich wirklich ändern, sind inputStream, transform() und outputStream. Wie ich schon sagte, das ist eine abgespeckte Version dessen, was ich tatsächlich benutze. Ich habe eine Menge Fehlerbehandlung und Protokollierung bei jedem Pipe-Schritt, weshalb ich letztendlich versuche, den Code zu abstrahieren.

Was ich suche zu schreiben, ist ein einzelnes Rohr Schritt, etwa so:

inputStream 
.pipe(csvFunction(transform(row))) 
.pipe(outputStream); 

Was ich kämpfen, zu verstehen, wie diese Rohr Schritte in eine einzige Funktion zu aktivieren, die einen Strom übernimmt und gibt einen Stream zurück. Ich habe Bibliotheken wie through2 angeschaut, aber ich bin mir nicht sicher, wie ich dorthin komme, wo ich hin will.

Antwort

0

Hier ist, was ich am Ende mit gehen. Ich benutzte die Bibliothek through2 und die streaming API of the csv library, um die Rohrfunktion zu erstellen, die ich suchte.

var csv = require('csv'); 
    through = require('through2'); 

module.exports = function(transformFunc) { 
    parser = csv.parse({columns:true, relax_column_count:true}), 
    transformer = csv.transform(function(row) { 
     return transformFunc(row); 
    }), 
    stringifier = csv.stringify({header: true}); 

    return through(function(chunk,enc,cb){ 
     var stream = this; 

      parser.on('data', function(data){ 
       transformer.write(data); 
      }); 

      transformer.on('data', function(data){ 
       stringifier.write(data); 
      }); 

      stringifier.on('data', function(data){ 
       stream.push(data); 
      }); 

      parser.write(chunk); 

      parser.removeAllListeners('data'); 
      transformer.removeAllListeners('data'); 
      stringifier.removeAllListeners('data'); 
      cb(); 
    }) 
} 

Es ist erwähnenswert, den Teil, wo ich die Ereignis-Listener zum Ende zu entfernen, dies war aufgrund in Speicherfehler zu laufen, wo ich zu viele Event-Listener erstellt hatte. Ich versuchte zunächst, dieses Problem zu lösen, indem ich Ereignisse mit once hörte, aber das verhinderte, dass nachfolgende Blöcke gelesen und an den nächsten Pipe-Schritt weitergeleitet wurden.

Lassen Sie mich wissen, wenn jemand Feedback oder zusätzliche Ideen hat.

2

können Sie die PassThrough Klasse wie folgt verwenden:

var PassThrough = require('stream').PassThrough; 

var csvStream = new PassThrough(); 
csvStream.on('pipe', function (source) { 
    // undo piping of source 
    source.unpipe(this); 
    // build own pipe-line and store internally 
    this.combinedStream = 
    source.pipe(csv.parse({columns: true})) 
     .pipe(csv.transform(function (row) { 
     return transform(row); 
     })) 
     .pipe(csv.stringify({header: true})); 
}); 

csvStream.pipe = function (dest, options) { 
    // pipe internal combined stream to dest 
    return this.combinedStream.pipe(dest, options); 
}; 

inputStream 
    .pipe(csvStream) 
    .pipe(outputStream); 
+0

Wie würde ich die Transform (Zeilen) -Referenz in den Stream übergeben? Wäre es nicht besser, einen Transformations-Stream zu verwenden? Ich habe gelesen, dass PassThrough mehr für die Überwachung eines Streams als tatsächlich Arbeit daran ist? – AdamPat

+0

Hängt davon ab, wie Sie Ihr Stream-Handling packen. Stellen Sie die Funktion einfach an einen Ort, an dem sie sichtbar ist, oder geben Sie sie als Referenz ein. Und natürlich können Sie Ihre Lösung in einem Transform-Stream implementieren (schauen Sie hier https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream). Die Idee mit einem PassThrough-Stream zu zeigen, war einfach der einfachste Weg, den ich gefunden habe. – Marc

Verwandte Themen