2016-03-02 3 views
6

Ich versuche RxJS für eine einfache kurze Umfrage zu verwenden. Es muss eine Anfrage einmal alle delay Sekunden an den Standort path auf dem Server, endet, sobald eine von zwei Bedingungen erreicht werden: entweder der Rückruf isComplete(data) gibt True zurück oder es hat versucht, den Server mehr als maxTries. Hier ist der Grundcode:RxJS 5.0 "do while" wie Mechanismus

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

Allerdings ist DoWhile nicht in existieren RxJS 5,0, so dass der Zustand, in dem es nur den Server maxTries funktioniert, dank der Aufnahme() -Aufruf versuchen können, aber die isComplete Zustand nicht Arbeit. Wie kann ich es so machen, dass das beobachtbare next() -Werte bis isComplete true zurückgibt, an welchem ​​Punkt next() der Wert und complete() wird.

Ich sollte beachten, dass takeWhile() funktioniert nicht für mich hier. Es gibt nicht den letzten Wert zurück, der eigentlich der wichtigste ist, da wir wissen, dass es fertig ist.

Vielen Dank!

+0

Möglicherweise ein Duplikat: http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

Es ist kein duplizieren in dem Sinne, dass die Frage nach einem Ersatz von "doWhile" verlangt. –

Antwort

0

Wir können eine Dienstprogrammfunktion erstellen, um ein zweites Observable zu erstellen, das jeden Gegenstand ausstrahlt, den das innere Observable aussendet; jedoch werden wir die OnCompleted Funktion aufrufen, sobald unsere Bedingung erfüllt ist:

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

Und hier ein kurzer Schnipsel unser neues Dienstprogramm-Methode:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

Diese Antwort basiert weg: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

Seltsamerweise scheint diese Lösung ungefähr die Hälfte der Zeit zu funktionieren. Manchmal schafft es die letzte Datenmenge zu den Abonnenten, manchmal nicht. Irgendeine Idee warum? Der Wechsel zu switchMap hat es auch nicht behoben. –

0

Dies erreichen Sie mit den Operatoren retry und first.

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>