2017-08-24 1 views
1

Zusammenfassung: Ich benutze Rxjs und ein Neuling. Ich möchte ein solches Szenario mit beobachtbaren implementieren, hatte aber bis jetzt noch kein Glück.rxjs Puffer bis Timeout zurückgesetzt auf neue arg Ankünfte

Es gibt eine Funktion loadDetailsFromServer (itemIds), die eine Server-API aufruft und einige Items übergibt. Diese Funktion wird sporadisch aufgerufen. Um Server-Aufrufe zu optimieren, hier ist, was ich tun möchte: mit Ankunft des ersten Funktionsaufrufs wird ein Timeout ausgelöst. Wenn vor der Zeitüberschreitung neue Funktionsaufrufe eintreffen, wird der Zeitlimit zurückgesetzt, um erneut zu starten. Wenn das Zeitlimit eintritt, wird ein Serveraufruf durchgeführt und die Anzahl der Argumente wird auf Null zurückgesetzt.

hier ein Marmor-ish Diagramm ist:

Timer is 4 clicks. 
INPUTS IN TIME  1-2---3-4-----5--------6-7-------- 
loadDetailsFromServer [1,2,3,4] -  [5]   -[6,7] 

function called with [1,2,3,4] because no more calls after 4 clicks. 

Hinweis: diese das Suchfeld Probe ähnlich ist und die Ergebnisse vom Server erhalten, außer dass Zwischenwerte von Interesse sind, und nicht ignoriert werden/übersprungen. wenn

Antwort

0

Zum Beispiel haben Sie Quelle Observable wie folgt aus:

const Rx = require('rxjs/Rx'); 
const Observable = Rx.Observable; 

const TIMEOUT = 1000; 

const source = Observable.range(1, 20) 
    .concatMap(v => Observable.of(v).delay(Math.random() * 2000)); 

Dann können Sie ihre Werte mit scan puffern. Um den Puffer zurückzusetzen benutze ich .merge(bufferNotifier.mapTo(null)). Dann mit switchMap() ich warte immer auf 1000ms für die forkJoin() zu emittieren. Wenn es nicht durch eine andere beobachtbare „überschreibt“ ist, weil neue Puffer angekommen:

const bufferNotifier = new Subject(); 

const chain = source 
    .do(undefined, undefined,() => bufferNotifier.complete()) // properly complete the chain 
    .merge(bufferNotifier.mapTo(null)) // reset buffer Subject 
    .scan((acc, val) => { 
     if (val === null) { 
      return []; 
     } 
     acc.push(val); 
     return acc; 
    }, []) 
    .filter(arr => arr.length > 0) 
    .switchMap(buffer => { // wait 1s until emitting the buffer further 
     return Observable.forkJoin(
      Observable.of(buffer), 
      Observable.timer(1000).take(1), 
      arr => arr 
     ); 
    }) 
    .do(() => bufferNotifier.next()) // trigger reset the buffer 
    .subscribe(console.log); 

Dies gibt zum Beispiel:

[ 1 ] 
[ 2 ] 
[ 3, 4 ] 
[ 5 ] 
[ 6, 7 ] 
[ 8, 9, 10, 11, 12 ] 
[ 13 ] 
[ 14, 15 ] 
[ 16 ] 
[ 17 ] 
[ 18 ] 
[ 19, 20 ] 
0

Wenn Sie haben ein ähnliches source beobachtbar martin Antwort, so etwas wie diese funktionieren könnte:

source 
    .buffer(source.debounceTime(250)) 
    .subscribe(console.log); 

buffer alle emittierten Werte bis zum angegebenen beobachtbaren aussendet sammelt. In diesem Fall wartet es bis debounceTime abgibt. CodePen: https://codepen.io/anon/pen/PKBaZm?editors=1010

Verwandte Themen