2014-02-09 6 views
12

Was wäre der idiomatische Weg, Werte eines Beobachtbaren um einen bestimmten Zeitraum zu erhalten? Nehmen wir zum Beispiel an, dass ich ein Observable aus einem großen Array erstellt habe und alle 2 Sekunden einen Wert erhalten möchte. Ist eine Kombination aus interval und selectMany der beste Weg?Separate beobachtbare Werte nach bestimmter Zeit in RxJS

Antwort

20

Für Ihre speziellen Beispiel ist die Idee, jeden Wert aus dem Array zu einer beobachtbaren abzubilden, die das Ergebnis nach einer Verzögerung ergeben, dann verketten den resultierenden Strom von Observablen:

var delayedStream = Rx.Observable 
    .fromArray([1, 2, 3, 4, 5]) 
    .map(function (value) { return Rx.Observable.return(value).delay(2000); }) 
    .concatAll(); 

Andere Beispiele könnten in der Tat Verwenden Sie timer oder interval. Es kommt nur darauf an.

Zum Beispiel, wenn Ihr Array wirklich sehr groß ist, dann verursacht das oben genannte eine Menge Speicherdruck (weil es N Observables für eine wirklich große N erzeugt). Hier ist eine Alternative, die zu interval lazily zu Fuß das Array verwendet:

var delayedStream = Rx.Observable 
    .interval(2000) 
    .take(reallyBigArray.length) // end the observable after it pulses N times 
    .map(function (i) { return reallyBigArray[i]; }); 

Dieser wird den nächsten Wert aus dem Array ergibt alle 2 Sekunden, bis es über die gesamte Array iteriert wird.

6

Während Brandons Antwort den Kern der Idee enthält, gibt es hier eine Version, die den ersten Gegenstand sofort liefert und dann die Zeit zwischen den folgenden Punkten einstellt.

var delay = Rx.Observable.empty().delay(2000); 

var items = Rx.Observable.fromArray([1,2,3,4,5]) 
    .map(function (x) { 
    return Rx.Observable.return(x).concat(delay); // put some time after the item 
    }) 
    .concatAll(); 

für neueren RxJS Aktualisiert:

var delay = Rx.Observable.empty().delay(2000); 

var items = Rx.Observable.fromArray([1,2,3,4,5]) 
    .concatMap(function (x) { 
    return Rx.Observable.of(x).concat(delay); // put some time after the item 
    }); 
+0

Hinweis 'return' Siehe ist jetzt' von': https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md –

+0

Dies kann nun 'sein concatMap' (Raten concatMap ist neuer als 2014?). Dadurch werden sie alle zusammen in eine Warteschlange gestellt (andernfalls werden sie alle zusammen mit einer Verzögerung ausgegeben) –

+0

z. '.concatMap (x => Observable.von (x) .concat (Observable.empty(). delay (5000)))' –

17

Ich denke, dass zip besser und besser lesbaren Code erzeugen verwenden, immer noch nur 3 Observablen verwenden.

var items = ['A', 'B', 'C']; 

Rx.Observable.zip(
    Rx.Observable.fromArray(items), 
    Rx.Observable.timer(2000, 2000), 
    function(item, i) { return item;} 
) 
+0

Beste Antwort bisher – gropapa

4

Stimmen Sie zu, dass zip ein sauberer Ansatz ist. Hier ist ein wiederverwendbares Funktion ein Intervall Strom für ein Array zu erzeugen:

function yieldByInterval(items, time) { 
    return Rx.Observable.from(items).zip(
    Rx.Observable.interval(time), 
    function(item, index) { return item; } 
); 
} 

// test 
yieldByInterval(['A', 'B', 'C'], 2000) 
    .subscribe(console.log.bind(console)); 

Dies baut auf farincz's answer, aber ist etwas kürzer als Instanz .zip Verfahren.

Auch ich habe Rx.Observable.from() verwendet, weil Rx.Observable.fromArray()deprecated ist.

4

Für RxJS 5:

Rx.Observable.from([1, 2, 3, 4, 5]) 
    .zip(Rx.Observable.timer(0, 2000), x => x) 
    .subscribe(x => console.log(x));