2016-10-17 5 views
0

Ich versuche, einen websocket RXJS-basierten Wrapper zu schreiben.rxjs pausableBuffered mehrere Abonnements

Und ich kämpfe mit meinem Rxjs Verständnis.

Ich habe einen Pause-Stream, der die pausierbaren gepufferten Streams pausieren soll, wenn ein Fehler auftritt, und setze sie fort, sobald ich ein "ok" aus dem Websocket bekomme.

Irgendwie wird nur das erste Abonnement auf meine pausierbaren gepufferten Streams gefeuert. Von da an stapelt sich nur die Warteschlange höher.

Ich habe eine Jsbin vorbereitet, um das Problem zu reproduzieren.

https://jsbin.com/mafakar/edit?js,console

Dort werden die "msg recived" Strom nur Feuer für das erste Abonnement. Und dann beginnen der q und der Beobachter sich zu stapeln.

Ich habe irgendwie das Gefühl, dass es um heiße und kalte Obserables geht, aber ich kann die Probleme nicht erfassen. Ich würde jede Hilfe schätzen.

Vielen Dank im Voraus!

Antwort

1

Es ist nicht das Kalt/Heiß-Problem. Was Sie in Ihrer onMessage tun, ist abonnieren, dann entsorgen. Die Verfügung beendet die Sequenz. Die onMessageStream sollte nur einmal abonniert werden, zum Beispiel im Konstruktor:

this.onmessageStream.subscribe(message => console.log('--- msg --- ', message.data)); 

Der abonnieren Block, einschließlich der dispose sollte entfernt werden.

Beachten Sie auch, dass Sie replaySubject ohne eine Anzahl verwendet haben. Dies bedeutet, dass die Warteschlange alle vorherigen Werte enthält. Wenn dies nicht ein gewünschtes Verhalten ist, wird in Betracht gezogen, es zu .replaySubject(1)

zu ändern. Hier ist ein working jsbin.

1

Wie @Meir wies dispose in einem Subskriptionsblock ist ein Nein Nein, da sein Verhalten nicht deterministisch ist. Im Allgemeinen würde ich die Verwendung von Subjects vermeiden und stattdessen auf Factory-Methoden zurückgreifen. Sie können eine Refactoring-Version siehe hier: https://jsbin.com/popixoqafe/1/edit?js,console

Ein schneller Abbau der Änderungen:

class WebSocketWrapper { 
    // Inject the pauser from the external state 
    constructor(pauser) { 

    // Input you need to keep as a subject 
    this.input$ = new Rx.Subject(); 

    // Create the socket 
    this._socket = this._connect();  

    // Create a stream for the open event 
    this.open$ = Rx.Observable.fromEvent(this._socket, 'open'); 

    // This concats the external pauser with the 
    // open event. The result is a pauser that won't unpause until 
    // the socket is open. 
    this.pauser$ = Rx.Observable.concat(
     this.open$.take(1).map(true) 
     pauser || Rx.Observable.empty() 
    ) 
     .startWith(false); 

    // subscribe and buffer the input 
    this.input$ 
     .pausableBuffered(this.pauser$) 
     .subscribe(msg => this._socket.send(msg)); 

    // Create a stream around the message event 
    this.message$ = Rx.Observable.fromEvent(this._socket, 'message') 

     // Buffer the messages 
     .pausableBuffered(this.pauser$) 

     // Create a shared version of the stream and always replay the last 
     // value to new subscribers 
     .shareReplay(1); 
    } 

    send(request) { 
    // Push to input 
    this.input$.onNext(request); 
    } 

    _connect() { 
    return new WebSocket('wss://echo.websocket.org'); 
    } 
} 

Als beiseite sollten Sie auch wie source auf interne Variablen nicht darauf angewiesen, die für externen Verbrauch nicht gemeint sind. Obwohl RxJS 4 relativ stabil ist, können diese unter Ihnen geändert werden, da diese nicht für den öffentlichen Verbrauch bestimmt sind.

+0

Ich habe meinen Code nach Ihrem Rat geändert! (Vielen Dank) Jetzt habe ich ein anderes Problem: Nachdem ich die Obserables auf "fromEvent" geändert habe, weiß ich nicht, wie ich die Streams "ersetzen" soll (aktuelle Abonnenten behalten), wenn ich die Verbindung zu meinem Server abbringe und wieder herstelle. Vorher konnte ich meine private Funktion "neu binden" und OnNext aufrufen, um die Ereignisse auszulösen. –

Verwandte Themen