2017-12-18 2 views
4

Warnung: RxJS newb hier.RxJS: Wie mehrere verschachtelte Observablen mit Puffer kombiniert werden

Hier ist meine Herausforderung:

  1. Wenn ein onUnlink$ beobachtbaren aussendet ...
  2. beginnen sofort Werte aus einer onAdd$ beobachtbar, für maximal 1 Sekunde (ich nenne diese Partition onAddBuffer$) erfassen .
  3. Abfrage eine Datenbank (eine doc$ beobachtbaren erstellen), ein Modell holen wir gegen eine der onAdd$ Werte aus der onAddBuffer$
  4. Wenn einer der Werte übereinstimmen verwenden werden, die doc$ Wert beobachtbaren übereinstimmt, emittieren
  5. nicht
  6. wenn keiner der Werte aus dem onAddBuffer$ beobachtbaren entspricht den doc$ Wert, oder wenn die beobachtbaren onAddBuffer$ nie emittiert, emittiert den doc$ Wert

das ist meine beste Vermutung war:

// for starters, concatMap doesn't seem right -- I want a whole new stream 
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => { 

    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })) 

    const onAddBuffer$ = onAdd$ 
    .buffer(doc$) // capture events while fetching from db -- not sure about this 
    .takeUntil(Rx.Observable.timer(1000)); 

    // if there is a match, emit nothing. otherwise wait 1 second and emit doc 
    return doc$.switchMap(doc => 
    Rx.Observable.race( 
     onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()), 
     Rx.Observable.timer(1000).mapTo(doc) 
    ) 
); 
}); 

docsToRemove$.subscribe(doc => { 
    // should only ever be invoked (with doc -- the doc$ value) 1 second 
    // after `onUnlink$` emits, when there are no matching `onAdd$` 
    // values within that 1 second window. 
}) 

Dies gibt immer EmptyObservable aus. Vielleicht ist es, weil single scheint undefined emittieren, wenn es keine Übereinstimmung gibt, und ich erwarte es nicht zu emittieren, wenn es keine Übereinstimmung gibt? Das Gleiche passiert mit find.

Wenn ich single zu filter ändere, emittiert nichts jemals.

Zur Info: Dies ist ein Umbenennungs Szenario mit Dateisystemereignissen - wenn ein add Ereignis innerhalb 1 Sekunde eines unlink Ereignisses folgt und die emittierte Datei-Hashes Spiel, nichts zu tun, weil es ein rename ist. Ansonsten ist es eine echte unlink und es sollte die Datenbank doc ausgegeben werden entfernt werden.

+0

Es klingt, als ob Sie hier eine ziemlich fiese Race Condition aufbauen. Timeouts sind normalerweise kein guter Weg, um damit umzugehen - wenn die Dinge aus irgendeinem Grund länger dauern, verlieren Sie Daten. –

+1

ja hier besteht durchaus Potenzial für eine Race Condition. es könnte letztendlich diesen Ansatz zunichte machen.aber es schien eine gute Gelegenheit zu sein, Rxjs zu lernen. – glortho

Antwort

3

Das ist meine Vermutung, wie Sie dies tun könnte:

onUnlink$.concatMap(unlinkValue => { 
    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share(); 
    const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$); 
    const onAddBuffer$ = onAdd$.buffer(bufferDuration$); 

    return Observable.forkJoin(onAddBuffer$, doc$) 
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ }); 
}); 

Der single() Betreiber ein wenig schwierig ist, weil es das Element emittiert, das das Prädikat Funktion passt nur nach der Quelle beobachtbare abgeschlossen (oder sendet ein Fehler, wenn zwei oder keine übereinstimmenden Elemente vorhanden sind).

Die race() ist auch schwierig. Wenn einer der Quell-Observables abgeschlossen ist und keinen Wert ausgibt, wird race()nur abgeschlossen und nichts ausgegeben. Ich habe dies vor einiger Zeit gemeldet und das ist das richtige Verhalten, siehe https://github.com/ReactiveX/rxjs/issues/2641.
Ich denke, das ist, was in Ihrem Code schief gelaufen ist.

Beachten Sie auch, dass .mapTo(Rx.Observable.empty()) jeden Wert in eine Instanz von Observable abbildet. Wenn Sie alle Werte ignorieren möchten, können Sie den Operator filter(() => false) oder den Operator ignoreElements() verwenden.

Verwandte Themen