2017-10-19 1 views
2

fusionieren Ich habe zwei Observablen:RxJs wie zwei überlappende beobachtbaren in eine

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-| 
-13--14--15--16--17--18--19-----20---------21--------------22------23--24--> 

Die erste enthält eine steigende Zahl, aber stoppt nach einer gewissen Zeit (dies ist das Cursor-Ergebnis aus der Datenbank) Der zweite ist kontinuierlich steigende Anzahl. Enthält eine Nummer von der ersten, aber hören Sie nicht auf zu emittieren. (Dies sind die neu eingefügten Daten in die Datenbank)

Ich möchte diese zwei beobachtbaren eine kontinuierliche wie dies beobachtbare aussehen:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24--> 

Diese beobachtbaren nur einmal jede Zahl enthält, die Emissions Ordnung zu halten.

Wie kann es mit so wenig Speicher wie möglich gelöst werden?

+0

fragen sich, wie hast du so zwei überlappende Observable erhalten Streams? Was ist ihre Bedeutung in Ihrem Problem? –

+1

Ich benutze Rethinkdb. Ich habe alte Daten in der Datenbank, die vom Cursor gelesen werden, und neu eingefügte Daten, die vom Änderungsvorschub ausgegeben werden. Während ich die Daten vom Cursor lese, werden die neu eingefügten Daten ebenfalls vom Cursor gelesen. Das verursacht die Überlappung –

Antwort

2

Ich denke, der beste Ansatz hier ist es, b $ zu puffern, bis ein $ -Stream b $ erreicht, dann alle gepufferten Elemente von b $ ausgeben und zu b $ wechseln. Etwas wie folgt aus:

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'; 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'; 
 

 
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x)); 
 

 
const a$ = fromMarble(a).share(); 
 
const b$ = fromMarble(b).share(); 
 

 
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share(); 
 

 
const distinct$ = Rx.Observable.merge(
 
\t a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
 
\t b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'), 
 
\t b$.skipUntil(switchingSignal$).map(x => x + '(from b$)') 
 
); 
 

 
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>

2

Sie können dies tun, indem alle Elemente aus dem ersten Strom unter verketteten (.concat) mit dem zweiten Strom mit Ausnahme von (.skipWhile einschließlich) Elementen vor spätestens einem (.last)

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15' 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24' 
 
const fromMarble = str => Rx.Observable.defer(() => { 
 
    console.log('side effect from subscribing to: ' + str); 
 
    return Rx.Observable.from(str.split('-').filter(v => v.length)); 
 
}); 
 

 
const a$ = fromMarble(a); 
 
const b$ = fromMarble(b); 
 

 
const distinct$ = Rx.Observable.concat(
 
    a$, 
 
    a$.last().switchMap(latest => 
 
    // .skipWhile + .skip(1) => skipWhile but inclusive 
 
    b$.skipWhile(v => v !== latest).skip(1) 
 
), 
 
); 
 

 
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Auch wenn Sie beim Abonnieren Nebenwirkungen haben (zum Beispiel beim Abonnieren - der neue Cursor wird erstellt), können Sie diesen Nebeneffekt für alle Teilnehmer freigeben, indem Sie beispielsweise const a$ = fromMarble(a).shareReaplay() verwenden.

Sie können mehr über den Austausch Nebenwirkungen lesen:

+0

Schöne Lösung. Falls jedoch der b $ -Stream nach dem a $ -Stream ausgegeben wird, wird der distinkte $ -Stream unnötigerweise verzögert. Nicht sicher, ob es ein Problem ist, aber es könnte ohne Verzögerung gelöst werden. – ZahiC

+0

Netter Fang @ ZahiC und ein anderes Problem sind, wenn 'b $' leer ist - du wirst nichts bekommen. Also habe ich meine Antwort geändert, um diese Fälle zu behandeln –

+0

Das ist definitiv die Verzögerung zu lösen, wenn b $ nach einem $ emittiert, aber jetzt könnten Sie b $ zu spät abonniert haben (der verkettete Stream wird abonniert, wenn der erste Stream abgeschlossen ist). Wenn das b $ -Abonnement Zeit benötigt (z. B. eine Verbindung zu einer Datenbank herstellt), wird der Wechsel zu b $ langsamer. Sie können dies lösen, indem Sie eine Verbindung zu b $ eher herstellen (hot Observable), aber dann verlieren Sie möglicherweise einige Elemente. Ich werde eine weitere Lösung nur als Referenz hinzufügen – ZahiC

Verwandte Themen