2017-02-24 2 views
1

Ich bin neu bei RxJava 2 und möchte einen Completable Server-API-Aufruf wiederholen, bis er erfolgreich ist, während Benachrichtigungen der Wiederholungsversuche ausgegeben werden, sodass meine Benutzeroberfläche den Wiederholungsstatus anzeigen kann der Benutzer.RxJava 2: Erneut abschließen, während Wiederholungsbenachrichtigungen an Abonnenten gesendet werden

Etwas wie folgt aus:

public Observable<RetryAttempt> retryServerCall() { 

    // execute Completable serverCall() 

    // if an error is thrown, emit new RetryAttempt(++retryCount, error) to subscriber 

    // retry until serverCall() is successful 
} 

public Completable serverCall(); 

public class RetryAttempt { 
    public RetryAttempt(int retryCount, Throwable cause); 
} 

ich ein paar verschiedene Ansätze ausprobiert habe und lief in Hindernisse. Am nächsten kommt dieser Ansatz, indem ein umschließendes Observable erstellt wird und onNext()/onComplete()/onError() explizit aufgerufen wird.

public Observable<RetryAttempt> retryServerCall() { 
    final int[] retryCount = {0}; 
    return Observable.create(e -> 
     serverCall() 
       .doOnError(throwable -> e.onNext(new RequestHelp.RetryAttempt(++retryCount[0], throwable))) 
       .retry() 
       .subscribe(() -> e.onComplete(), throwable -> e.onError(throwable))); 
} 

Vielleicht ist es ein wenig eine periphere Angelegenheit, aber ich hatte für retryCount um die final Array zu verwenden, um den Fehler variable used in lambda should be final or effectively final zu vermeiden.

Ich weiß, es muss ein besser sein, dies mit Rx Voodoo zu erreichen. Jede Anleitung wird sehr geschätzt!

+0

Sie wollen nicht, es genau so tun, wie Sie Abmelde Signalisierung verlieren. –

+0

@Tassos Wahr. Ich kann 'subscribeWith' verwenden, um ein' Disposable' für das innere 'Observable' zu ​​erhalten und dann über' setDisposable' zu ​​entsorgen, richtig? – HolySamosa

Antwort

0
public Single<List<Farmer>> getAllFarmers(long timestamp) { 

    return Observable.fromCallable(() -> mapiFactory.getAllFarmerAboveTime(timestamp)) 
      .doOnError(throwable -> Log.d(TAG, "Error calling getAllFarmers: "+throwable.getMessage())) 
      .retryWhen(new RetryWithDelay(5,1000)) 
      .concatMap(farmersResponse -> Observable.fromIterable(farmersResponse.farmer)) 
      .filter(farmer -> !StringUtils.isBlank(farmer.cnic)) 
      .map(this::validateCnic) 
      .distinct(farmer -> farmer.cnic) 
      .toList(); 

} 

wenn fromCallable() -Methode Wurf Ausnahme .retryWhen (neu RetryWithDelay (5,1000)) wird hier führen wir ab 1000

Start der api für 5mal bei exponentiellen Verzögerung wiederholen und hier RetryWithDelay ist

public class RetryWithDelay implements Function<Observable<Throwable>, 
    Observable<?>> { 

private final int _maxRetries; 
private final int _retryDelayMillis; 
private int _retryCount; 

public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { 
    _maxRetries = maxRetries; 
    _retryDelayMillis = retryDelayMillis; 
    _retryCount = 0; 
} 


@Override 
public Observable<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { 

    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { 
     @Override 
     public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { 

       if (++_retryCount < _maxRetries) { 

       // When this Observable calls onNext, the original 
       // Observable will be retried (i.e. re-subscribed) 

       Log.d(TAG, String.format("Retrying in %d ms", _retryCount * _retryDelayMillis)); 

       return Observable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS); 
      } 

      // Max retries hit. Pass an error so the chain is forcibly completed 
      // only onNext triggers a re-subscription (onError + onComplete kills it) 
      return Observable.error(throwable); 
     } 

    }); 
} 

}

Verwandte Themen