Zunächst einmal kann die Schaffung des Themas wie folgt vereinfacht werden:
const subject = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.share();
Der Anteil Methode wird ein Gegenstand aus dem Stream erstellen. Wenn Sie diese Subjekt-Instanz an jeden Abonnenten zurückgeben, erhalten Sie dasselbe Verhalten und es sieht besser aus.
a) if the events come in super fast is the handler going to process
them synchronously and in the right order given the way I have it?
Ereignisse werden nacheinander und in der richtigen Reihenfolge durch die gesamte Kette geschoben. Das bedeutet, dass ein Ereignis, das über das 'fromEvent' eingeht, bis zu dem Punkt, an dem Sie es abonniert haben, durch die gesamte Kette geschoben wird, bevor der nächste Wert bearbeitet wird (es sei denn, es gibt einen asynchronen Operator dazwischen :)). Ben Lesh erklärte dies bei eckigen Verbindung 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ (Sie können das ganze Gespräch beobachten, aber es ist um min 17, wo er Arrays mit Observablen vergleicht).
b) if the different handlers handle the event at different speeds are
they all going to wait till the slowest handler is through before the
next event is provided? or will they all sort of buffer and handle at
they're own pace?
Sie werden die Ereignisse in ihrem eigenen Tempo behandeln. Überprüfen Sie das folgende Beispiel:
let interval$ = Rx.Observable.interval(1000).share();
interval$.concatMap((val) => {
console.log('called');
return Rx.Observable.of(val).delay(3000)
})
.subscribe((val) => console.log("slow ", val));
interval$.subscribe((val) => console.log("fast ", val));
Hier verwende ich ein Intervall beobachtbar, dass ich in ein Thema umwandeln. So wird jede Sekunde ein Ereignis gesendet. Ich habe ein Abonnement, das einen Wert nimmt, diesen Wert behandelt (der 2 Sekunden dauert) und dann den nächsten (mit der concatMap) nehme. Und ein weiteres Abonnement, das sie sofort verarbeitet. Wenn Sie diesen Code ausführen (jsbin hier: https://jsbin.com/zekalab/edit?js,console), werden Sie sehen, dass beide die Ereignisse in ihrem eigenen Tempo behandeln.
So warten sie nicht auf den langsamsten Handler und es wird intern gepuffert.
Die Situation, die Sie beschreiben, könnte potenziell eine gefährliche Situation haben, wenn der langsamste Prozessor langsamer ist als die Frequenz, mit der die Ereignisse ausgelöst werden. In diesem Fall würde Ihr Puffer weiter wachsen und Ihre Anwendung würde schließlich abstürzen. Dies ist ein Konzept namens Gegendruck. Sie erhalten Ereignisse schneller als Sie sie verarbeiten. In diesem Fall müssen Sie Operatoren wie 'buffer' oder 'window' auf den langsamsten Prozessoren verwenden, um diese Situation zu vermeiden.
genial. gute Antwort. Vielen Dank. Ich schaute auf den Puffer und schien Ereignisse zu gruppieren. Was ich brauche, ist im Grunde eine Warteschlange am Ende, die ich mit dem Betreff verbinden kann und das wird gepuffert. Würde Puffer dafür arbeiten? – Raif
Yep sollte den Trick machen – KwintenP