2017-02-12 6 views
8

Ich kann nicht herausfinden, wie publishReplay().refCount() funktioniert.rxjs 5 publishReplay refCount

Zum Beispiel (https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer => { 
    console.log("call"); 
    // expensive http request 
    observer.next(5); 
}).publishReplay().refCount(); 

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
subscription1.unsubscribe(); 
console.log(""); 

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)}); 
subscription2.unsubscribe(); 
console.log(""); 

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)}); 
subscription3.unsubscribe(); 
console.log(""); 

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)}); 
subscription4.unsubscribe(); 

ergibt folgendes Ergebnis:

Anruf Observera: 5

observerB: 5 Anruf observerB: 5

observerC: 5 observerC: 5 call observerC: 5

observerD: 5 observerD: 5 observerD: 5 Anruf observerD: 5

1) Warum observerB sind, C und D mehrmals aufgerufen?

2) Warum wird "call" auf jeder Zeile und nicht am Anfang der Zeile gedruckt?

Auch, wenn ich publishReplay(1).refCount() aufrufen, ruft es observerB, C und D jeweils 2 mal.

Was ich erwarte ist, dass jeder neue Beobachter den Wert 5 genau einmal erhält und "Anruf" nur einmal gedruckt wird.

Antwort

13

publishReplay(x).refCount() kombiniert macht folgendes:

  • Es hat eine ReplaySubject zu schaffen, die x Emissionen bis wiederholen. Wenn x nicht definiert ist, wird der gesamte Stream wiederholt.
  • Es macht diese ReplaySubject Multicast kompatibel mit einem refCount() -Operator. Dies führt zu gleichzeitig Subskriptionen, die die gleichen Emissionen erhalten.

Ihr Beispiel enthält ein paar Probleme, die die Funktionsweise von Cloud Computing beeinflussen. Siehe den folgenden überarbeiteten Schnipsel:

var state = 5 
 
var realSource = Rx.Observable.create(observer => { 
 
    console.log("creating expensive HTTP-based emission"); 
 
    observer.next(state++); 
 
// observer.complete(); 
 
    
 
    return() => { 
 
    console.log('unsubscribing from source') 
 
    } 
 
}); 
 

 

 
var source = Rx.Observable.of('') 
 
    .do(() => console.log('stream subscribed')) 
 
    .ignoreElements() 
 
    .concat(realSource) 
 
.do(null, null,() => console.log('stream completed')) 
 
.publishReplay() 
 
.refCount() 
 
; 
 
    
 
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
 
subscription1.unsubscribe(); 
 
    
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v)); 
 
subscription2.unsubscribe(); 
 
    
 
subscription3 = source.subscribe(v => console.log('observerC: ' + v)); 
 
subscription3.unsubscribe(); 
 
    
 
subscription4 = source.subscribe(v => console.log('observerD: ' + v)); 
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

Wenn diese Schnipsel laufen wir klar sehen können, dass es keine doppelten Werte für Observer emittiert D, ist es in der Tat neue Emissionen zu schaffen für jedes Abonnement . Woher?

Jedes Abonnement wird abbestellt, bevor das nächste Abonnement stattfindet. Dies führt dazu, dass refCount effektiv auf Null zurückgeht und kein Multicasting durchgeführt wird. Das Problem besteht darin, dass der realSource-Stream nicht abgeschlossen wird. Da wir kein Multicasting durchführen, erhält der nächste Teilnehmer eine neue Instanz von realSource über das ReplaySubject und die neuen Emissionen werden mit den zuvor bereits emittierten Emissionen vorangestellt.

Um zu verhindern, dass Ihr Stream die teure HTTP-Anfrage mehrmals aufruft, müssen Sie den Stream abschließen, damit das publishReplay weiß, dass es keine erneute Anmeldung benötigt.

4

Dies passiert, weil Sie publishReplay() verwenden. Es erstellt intern eine Instanz von ReplaySubject, die alle Werte speichert, die durchlaufen werden.

Da Sie Observable.create verwenden, wo Sie einen einzelnen Wert ausgeben, fügen Sie bei jedem Aufruf von source.subscribe(...) einen Wert an den Puffer in ReplaySubject an.

Sie bekommen nicht call am Anfang jeder Zeile gedruckt, weil es die ReplaySubject ist, wer seinen Puffer emittiert zuerst, wenn Sie sich anmelden und dann abonniert hat es sich zu seiner Quelle:

Für Implementierungsdetails finden Sie unter:

Gleiches gilt bei Verwendung von publishReplay(1). Zuerst gibt er das gepufferte Element aus ReplaySubject und dann noch ein weiteres Element aus observer.next(5);

+0

Die einzige in Verbindung stehende Antwort –

4

Allgemein: Die refCount bedeutet, dass der Strom heiß ist/shared solange es mindestens 1 Abonnent ist - ist es jedoch zurückgesetzt/kalt wenn es keine Abonnenten gibt.

Dies bedeutet, wenn Sie absolut sicher sein wollen, dass nichts mehr als einmal ausgeführt wird, sollten Sie nicht refCount() verwenden, sondern einfach connect den Stream, um es heiß zu setzen.

Als zusätzliche Anmerkung: Wenn Sie eine observer.complete() nach der observer.next(5); hinzufügen, erhalten Sie auch das Ergebnis, das Sie erwartet haben.


Nebenbei bemerkt: Haben Sie wirklich Ihre eigenen erstellen müssen Obervable hier? In 95% der Fälle reichen die vorhandenen Betreiber für den gegebenen Anwendungsfall aus.

Verwandte Themen