2016-04-06 4 views
1

Das Problem ist, das doppelte Looping Verhalten mit RX zu emulieren:Gebäude doppelt unendliches Wahlverhalten in RX

while True: 
    try: 
     token = get_token() 
     while True: 
      try: 
       value = get_value_using_token(token) 
       do_something(value) 
      except: 
       break 
    except: 
     break 

Es wäre sauber, wenn die beiden Schleifen mit zwei Observablen ersetzt werden, wobei ein als Beobachter der äußere, während do_something(value) durch einen Beobachter allein ersetzt werden kann. Alle Ausnahmen können ebenfalls gut gehandhabt werden. Die äußere Schleife muss blockiert werden, die innere Schleife jedoch möglicherweise nicht, da ich versuche, die äußere Schleife zu verwenden, um Ausnahmen mithilfe der Wiederholungsfunktion mit einer Backoff-Funktion zu behandeln.

Bisher kann ich unter Verwendung einer Sequenz aufbauen:

Observable.from_iterable(value for value in iter(get_token, None)) 
    .subscribe(do_something) 

aber wie kann ich eine ähnliche Struktur machen Modus für die Außen in blockiert?

Antwort

1

Sie müssen nur den Operator Repeat verwenden, um eine Schleife zu erstellen. Und dann brauchen Sie den Operator Retry, um bei einem Fehler fortzufahren.

So etwas wie

Observable.return(get_token()) 
    .flatMap(token->Observable.return(get_value_using_token(token)) 
     .repeat()) 
    .retry() 
.subscribe(do_something) 

* Ich Python weiß nicht, so dass ich hoffe, dass Sie diesen Code psuedo umwandeln können

+0

Danke. Wiederholter Operator gibt jedoch den gleichen Wert zurück. Was ich brauche, ist den Funktionsaufruf zu wiederholen, da weder 'get_token' noch' get_value_using_token' eine reine Funktion ist. –

+0

Sorry, natürlich. Dann können Sie Observable.Return zu RxPy's Äquivalenz von 'Observable.Start' ändern oder Sie können Observable.Defer (Obs.Return (GetToken())) verwenden, damit es wieder faul ausgewertet wird. –

0

Was ich tat am Ende war einen unendlichen Strom von Funktionen repeat Operator zu erstellen, und map zu seiner Invokation.

def get_token(): 
    return some_value 

def get_value_with_token(token): 
    return some_value_using_token 

Observable.repeat(get_token)\ 
    .map(lambda f: f())\ 
    .map(lambda n: O.repeat(lambda: get_value_with_token(n)))\ 
    .concat_all()\ 
    .map(lambda f: f())\ 
    .subscribe(logger.info) 

wo get_token und get_value_with_token sind Funktionen.

Durch die Verwendung von Blockierungsfunktionen für beide, kann ich eine Doppelschleife machen und zusätzliche rx Operatoren wie retry auf die Observable anwenden.