Warnung: RxJS newb hier.RxJS: Wie mehrere verschachtelte Observablen mit Puffer kombiniert werden
Hier ist meine Herausforderung:
- Wenn ein
onUnlink$
beobachtbaren aussendet ... - beginnen sofort Werte aus einer
onAdd$
beobachtbar, für maximal 1 Sekunde (ich nenne diese PartitiononAddBuffer$
) erfassen . - Abfrage eine Datenbank (eine
doc$
beobachtbaren erstellen), ein Modell holen wir gegen eine deronAdd$
Werte aus deronAddBuffer$
- Wenn einer der Werte übereinstimmen verwenden werden, die
doc$
Wert beobachtbaren übereinstimmt, emittieren nicht
- wenn keiner der Werte aus dem
onAddBuffer$
beobachtbaren entspricht dendoc$
Wert, oder wenn die beobachtbarenonAddBuffer$
nie emittiert, emittiert dendoc$
Wert
das ist meine beste Vermutung war:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue }))
const onAddBuffer$ = onAdd$
.buffer(doc$) // capture events while fetching from db -- not sure about this
.takeUntil(Rx.Observable.timer(1000));
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap(doc =>
Rx.Observable.race(
onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()),
Rx.Observable.timer(1000).mapTo(doc)
)
);
});
docsToRemove$.subscribe(doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
Dies gibt immer EmptyObservable
aus. Vielleicht ist es, weil single
scheint undefined
emittieren, wenn es keine Übereinstimmung gibt, und ich erwarte es nicht zu emittieren, wenn es keine Übereinstimmung gibt? Das Gleiche passiert mit find
.
Wenn ich single
zu filter
ändere, emittiert nichts jemals.
Zur Info: Dies ist ein Umbenennungs Szenario mit Dateisystemereignissen - wenn ein add
Ereignis innerhalb 1 Sekunde eines unlink
Ereignisses folgt und die emittierte Datei-Hashes Spiel, nichts zu tun, weil es ein rename
ist. Ansonsten ist es eine echte unlink
und es sollte die Datenbank doc ausgegeben werden entfernt werden.
Es klingt, als ob Sie hier eine ziemlich fiese Race Condition aufbauen. Timeouts sind normalerweise kein guter Weg, um damit umzugehen - wenn die Dinge aus irgendeinem Grund länger dauern, verlieren Sie Daten. –
ja hier besteht durchaus Potenzial für eine Race Condition. es könnte letztendlich diesen Ansatz zunichte machen.aber es schien eine gute Gelegenheit zu sein, Rxjs zu lernen. – glortho