2017-01-08 4 views
3

konvertieren, wenn ich einen Knoten js Strom haben, sagen wir zum Beispiel von so etwas wie process.stdin oder aus fs.createReadStream, wie kann ich diese konvertieren mit RxJs5 einen RxJs beobachtbare Strom zu sein?Wie Knoten lesbaren Stream zu RX beobachtbaren

Ich sehe, dass RxJs-Node hat eine fromReadableStream Methode, aber das sieht aus wie es nicht in knapp einem Jahr aktualisiert wurde.

+0

so funktioniert es oder? Wen kümmert es, wie oft es aktualisiert wird, wenn es funktioniert – smnbbrv

+0

@smnbbrv Kein Zweifel, es funktioniert gut, aber es ist RxJS4 und ist nicht kompatibel mit RxJS5. – cartant

+2

Sie können sich [Quelle] (https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83) ansehen, um zu sehen, was Sie für die Konvertierung benötigen - Die Implementierung ist ziemlich klein. – cartant

Antwort

5

Für jeder, der danach sucht, folgt Marks Empfehlung I adapted rx-node fromStream implementation for rxjs5.

import { Observable } from 'rxjs'; 

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52 
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') { 
    stream.pause(); 

    return new Observable((observer) => { 
    function dataHandler(data) { 
     observer.next(data); 
    } 

    function errorHandler(err) { 
     observer.error(err); 
    } 

    function endHandler() { 
     observer.complete(); 
    } 

    stream.addListener(dataEventName, dataHandler); 
    stream.addListener('error', errorHandler); 
    stream.addListener(finishEventName, endHandler); 

    stream.resume(); 

    return() => { 
     stream.removeListener(dataEventName, dataHandler); 
     stream.removeListener('error', errorHandler); 
     stream.removeListener(finishEventName, endHandler); 
    }; 
    }).share(); 
} 
+0

Ich habe das nicht getestet, seit ich von dem, an dem ich gearbeitet habe, weitergearbeitet habe, aber wenn jemand anderes es getan hat und funktioniert, akzeptiere ich diese Antwort :) – JuanCaicedo

+0

Ich benutze es und bis jetzt funktioniert es gut. Ich habe es nicht getestet getestet. –

2

Die auf RxJs5 portiert werden folgende sollte sowohl für die Arbeit v4 und v5 (Haftungsausschluss ungetestet):

fromStream: function (stream, finishEventName, dataEventName) { 
    stream.pause(); 

    finishEventName || (finishEventName = 'end'); 
    dataEventName || (dataEventName = 'data'); 

    return Observable.create(function (observer) { 

     // This is the "next" event 
     const data$ = Observable.fromEvent(stream, dataEventName); 

     // Map this into an error event 
     const error$ = Observable.fromEvent(stream, 'error') 
     .flatMap(err => Observable.throw(err)); 

     // Shut down the stream 
     const complete$ = Observable.fromEvent(stream, finishEventName); 

     // Put it all together and subscribe 
     const sub = data$ 
     .merge(error$) 
     .takeUntil(complete$) 
     .subscribe(observer); 

     // Start the underlying node stream 
     stream.resume(); 

     // Return a handle to destroy the stream 
     return sub; 
    }) 

    // Avoid recreating the stream on duplicate subscriptions 
    .share(); 
    }, 
Verwandte Themen