2015-12-12 15 views
7

Ich versuche, RxJs über ein Observable in meinem Stream zu durchlaufen, bis es in einem bestimmten Zustand ist, dann den Stream fortsetzen. Insbesondere konvertiere ich eine synchrone do/while-Schleife nach RxJs, aber ich nehme an, dass die gleiche Antwort auch für eine for- oder while-Schleife verwendet werden könnte.RxJs: Wie Schleife basierend auf dem Zustand der Observablen?

Ich dachte, ich könnte doWhile() dafür verwenden, aber es scheint, als hätte die Condition-Funktion keinen Zugriff auf das Element im Stream, was den Zweck für mich zu besiegen scheint.

ich nicht ganz sicher bin, was die richtige reaktive Terminologie ist das, was ich will, aber hier ist ein Beispiel dafür, was ich für werde: wäre

var source = new Rx.Observable.of({val: 0, counter: 3}); 

source.map(o => { 
    o.counter--; 
    console.log('Counter: ' + o.counter); 

    if (!o.counter) { 
    o.val = "YESS!"; 
    } 
    return o; 
}) 
.doWhile(o => { 
    return o.counter > 0; 
}) 
.subscribe(
    function (x) { 
     console.log('Next: ' + x.val); 
    }, 
    function (err) { 
     console.log('Error: ' + err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 

Die erwartete Ausgabe:

Angenommen, dies ist ein lösbares Problem, ich bin unklar, wie Sie den "Start" von wo Sie zurückgeben wollen, wenn Sie Schleife.

Antwort

7

Es gibt den Operator expand, der Sie in die Nähe bringt, damit Sie eine Selektorfunktion rekursiv aufrufen können. Eine leere Observable zurückzugeben wäre in diesem Fall Ihre Pause. Siehe jsbin:

var source = Rx.Observable.return({val: 0, counter: 3}) 
    .expand(value => { 
     if(!value.counter) return Rx.Observable.empty(); 
     value.counter -= 1; 
     if(!value.counter) value.val = 'YESS'; 
     return Rx.Observable.return(value) 
    }) 
    .subscribe(value => console.log(value.counter ? 
            'Counter: ' + value.counter : 
            'Next: ' + value.val)); 
+0

Hum. .. interessant, könnte ich in der Lage sein, diese Arbeit zu machen, werde ich versuchen und zurück – JBCP

+0

mit dieser, wenn ich .expand(). map() m y Downstream-Karte würde den Wert mehrmals verarbeiten. Ich möchte, dass der Downstream nur das Endergebnis verarbeitet. Ich denke, expand(). Filter(). Map() könnte funktionieren. – JBCP

+0

Hm. Die einzige andere Möglichkeit, die es ermöglicht, die Weitergabe von mehreren Werten stromabwärts zu umgehen, ist ein Thema. Ich wäre daran interessiert, eine saubere Lösung für mich selbst :). –

5

nicht genau das, was Sie wollen, aber in der Nähe, mit expand Operator und Signalisierung Ende der Rekursion mit Rx.Observable.empty (http://jsfiddle.net/naaycu71/3/):

var source = new Rx.Observable.of({val: 0, counter: 3}); 

source.expand(function(o) { 
    console.log('Counter: ' + o.counter); 
    o.counter--; 

return (o.counter >= 0) ? Rx.Observable.just(o) : Rx.Observable.empty() 
}) 
.subscribe(
    function (x) { 
     console.log('Next: ' , x); 
    }, 
    function (err) { 
     console.log('Error: ' + err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 

Ausgang:

Next: Object {val: 0, counter: 3} 
Counter: 3 
Next: Object {val: 0, counter: 2} 
Counter: 2 
Next: Object {val: 0, counter: 1} 
Counter: 1 
Next: Object {val: 0, counter: 0} 
Counter: 0 
Completed 
Verwandte Themen