Ich versuche, einen websocket RXJS-basierten Wrapper zu schreiben.rxjs pausableBuffered mehrere Abonnements
Und ich kämpfe mit meinem Rxjs Verständnis.
Ich habe einen Pause-Stream, der die pausierbaren gepufferten Streams pausieren soll, wenn ein Fehler auftritt, und setze sie fort, sobald ich ein "ok" aus dem Websocket bekomme.
Irgendwie wird nur das erste Abonnement auf meine pausierbaren gepufferten Streams gefeuert. Von da an stapelt sich nur die Warteschlange höher.
Ich habe eine Jsbin vorbereitet, um das Problem zu reproduzieren.
https://jsbin.com/mafakar/edit?js,console
Dort werden die "msg recived" Strom nur Feuer für das erste Abonnement. Und dann beginnen der q und der Beobachter sich zu stapeln.
Ich habe irgendwie das Gefühl, dass es um heiße und kalte Obserables geht, aber ich kann die Probleme nicht erfassen. Ich würde jede Hilfe schätzen.
Vielen Dank im Voraus!
Ich habe meinen Code nach Ihrem Rat geändert! (Vielen Dank) Jetzt habe ich ein anderes Problem: Nachdem ich die Obserables auf "fromEvent" geändert habe, weiß ich nicht, wie ich die Streams "ersetzen" soll (aktuelle Abonnenten behalten), wenn ich die Verbindung zu meinem Server abbringe und wieder herstelle. Vorher konnte ich meine private Funktion "neu binden" und OnNext aufrufen, um die Ereignisse auszulösen. –