2017-07-10 3 views

Antwort

2

Sie bauen können Ihre eigene Schleife, basierend auf onErrorHandleWith:

def retryLimited[A](fa: Observable[A], maxRetries: Int) 
    (p: Throwable => Boolean): Observable[A] = { 

    // If we have no retries left, return the source untouched 
    if (maxRetries <= 0) fa else 
    fa.onErrorHandleWith { err => 
     // If predicate holds, do recursive call 
     if (p(err)) 
     retryLimited(fa, maxRetries - 1)(p) 
     else 
     Observable.raiseError(err) 
    } 
} 

Wenn Sie (ich) nicht einfache Funktionen mögen, können Sie immer einige Erweiterungsmethoden als Alternative aussetzen:

implicit class ObservableExtensions[A](val self: Observable[A]) 
    extends AnyVal { 

    def onErrorRetryLimited(maxRetries: Int) 
    (p: Throwable => Boolean): Observable[A] = 
    retryLimited(self, maxRetries)(p) 
} 

Hinweis auf die Antwort, @JVS ist in Geist OK, kann aber Problematisch sein, weil es den gemeinsamen veränderlichen Zustand beibehält, der für kalte Observable nicht in Ordnung ist. So bemerken, was passiert, wenn man so etwas tun:

val source = Observable.suspend { 
    if (Random.nextInt() % 10 != 0) 
    Observable.raiseError(new RuntimeException("dummy")) 
    else 
    Observable(1, 2, 3) 
} 

val listT = source 
    .onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart)) 
    .toListL 

listT.runAsync // OK 
listT.runAsync // Ooops, shared state, we might not have retries left 

Seien Sie vorsichtig bei wandelbar gemeinsamen Staat in beobachtbare der Betreiber. Sie können natürlich so arbeiten, aber Sie müssen sich der Gefahr bewusst sein :-)

+0

Vielen Dank für den Hinweis auf dieses potentielle Problem! – JVS

0

Warnung: Dies verwendet den gemeinsamen veränderlichen Status und kann für kalte Observablen falsch sein. Siehe Alexandrus Antwort.

eine Funktion definieren, es zu tun:

def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean = 
    ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex) 

Und mit dieser Funktion in onErrorRestartIf

.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart)) 

FYI, verwendet hier monix AtomicInt ...

import monix.execution.atomic.AtomicInt 
Verwandte Themen