2017-07-18 1 views
0

Nach der RxJS 5 Handbuch des verbindet auf MulticastingLogik auszuführen, wenn RxJS 5 RefCount() Abschnitt oder abmeldet von der Quelle

... können wir ConnectableObservable dem RefCount() Methode (Referenzzählung), die zurückkehrt verwenden ein Observable, der verfolgt, wie viele Abonnenten es hat. Wenn die Anzahl der Abonnenten von 0 auf 1 steigt, wird connect() für uns aufgerufen, was die gemeinsame Ausführung startet. Nur wenn die Anzahl der Teilnehmer von 1 auf 0 abnimmt, wird sie vollständig abbestellt, wodurch die weitere Ausführung gestoppt wird.

Ich mag würde zu verstehen, ob es möglich ist, in jedem dieser Ereignisse einzuhaken und einige Logik ausführen, idealerweise, bevor die Quelle des beobachtbaren connect() oder unsubscribe() auftritt, sondern auch nach der Tat akzeptabel wäre.

Wenn es keine Möglichkeit gibt, dies mit dem Operator refCount() zu tun, wäre es sehr hilfreich, wenn Sie ein Beispiel geben könnten, wie Sie dies mit einem benutzerdefinierten Operator erreichen können.

Ich dachte, vielleicht könnte ich irgendwie die kompletteFn von do(nextFn,errFn,completeFn) verwenden, um in diese Haken, aber scheint nicht zu funktionieren, wie durch das folgende Snippet gezeigt.

var source = Rx.Observable.interval(500) 
 
    .do(
 
    (x) => console.log('SOURCE emitted ' + x), 
 
    (err) => console.log('SOURCE erred ' + err), 
 
    () => console.log('SOURCE completed ') 
 
); 
 
var subject = new Rx.Subject(); 
 
var refCounted = source.multicast(subject).refCount(); 
 
var subscription1, subscription2, subscriptionConnect; 
 

 
// This calls `connect()`, because 
 
// it is the first subscriber to `refCounted` 
 
console.log('observerA subscribed'); 
 
subscription1 = refCounted.subscribe({ 
 
    next: (v) => console.log('observerA: ' + v) 
 
}); 
 

 
setTimeout(() => { 
 
    console.log('observerB subscribed'); 
 
    subscription2 = refCounted.subscribe({ 
 
    next: (v) => console.log('observerB: ' + v) 
 
    }); 
 
}, 600); 
 

 
setTimeout(() => { 
 
    console.log('observerA unsubscribed'); 
 
    subscription1.unsubscribe(); 
 
}, 1200); 
 

 
// This is when the shared Observable execution will stop, because 
 
// `refCounted` would have no more subscribers after this 
 
setTimeout(() => { 
 
    console.log('observerB unsubscribed'); 
 
    subscription2.unsubscribe(); 
 
}, 2000);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

+0

Was Ausgang Sie tun erwarten Sie von dem Beispiel, das Sie gemacht haben? Ich würde denken, dass du genau das suchst. – martin

Antwort

3

Sie eine Kombination aus .do(null,null, onComplete) vor Ihrem tatsächlichen Strom und .finally() nach Abschluss verwenden können/abmelden Ereignisse vor Abonnement haben und nach Abschluss/abmelden:

const source = Rx.Observable.empty() 
 
    .do(null,null,() => console.log('subscribed')) 
 
    .concat(Rx.Observable.interval(500)) 
 
    .finally(() => console.log('unsubscribed')) 
 
    .publish().refCount(); 
 

 
const sub1 = source 
 
    .take(5) 
 
    .subscribe(
 
    val => console.log('sub1 ' + val), 
 
    null, 
 
    () => console.log('sub1 completed') 
 
    ); 
 
const sub2 = source 
 
    .take(3) 
 
    .subscribe(
 
    val => console.log('sub2 ' + val), 
 
    null, 
 
    () => console.log('sub2 completed') 
 
); 
 

 
// simulate late subscription setting refCount() from 0 to 1 again      
 
setTimeout(() => { 
 
    source 
 
    .take(1) 
 
    .subscribe(
 
     val => console.log('late sub3 val: ' + val), 
 
     null, 
 
    () => console.log('sub3 completed') 
 
    ); 
 
    
 
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

Verwandte Themen