2014-09-03 17 views
7

Ich habe zwei Observables (zur Vereinfachung A und B genannt) und einen Abonnenten. Der Subskribent abonniert A, und wenn es einen Fehler auf A gibt, dann tritt B (welches der Fallback ist) ein. Nun, wenn A einen Fehler trifft, wird B fein aufgerufen, aber A ruft onComplete() auf dem Subskribenten auf, also B-Antwort erreicht den Abonnenten nie, auch wenn B-Ausführung erfolgreich ist.RxJava onErrorResumeNext()

Ist das das normale Verhalten? Ich dachte onErrorResumeNext() sollte den Stream fortsetzen und den Abonnenten benachrichtigen, sobald er wie in der Dokumentation (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext) beschrieben fertig ist.

ist dies die Gesamtstruktur, was ich tue (mehrere „langweilig“ Code weggelassen):

public Observable<ModelA> observeGetAPI(){ 
    return retrofitAPI.getObservableAPI1() 
      .flatMap(observableApi1Response -> { 
       ModelA model = new ModelA(); 

       model.setApi1Response(observableApi1Response); 

       return retrofitAPI.getObservableAPI2() 
         .map(observableApi2Response -> { 
          // Blah blah blah... 
          return model; 
         }) 
         .onErrorResumeNext(observeGetAPIFallback(model)) 
         .subscribeOn(Schedulers.newThread()) 
      }) 
      .onErrorReturn(throwable -> { 
       // Blah blah blah... 
       return model; 
      }) 
      .subscribeOn(Schedulers.newThread()); 
} 

private Observable<ModelA> observeGetAPIFallback(ModelA model){ 
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> { 
     // Blah blah blah... 
     return model; 
    }).onErrorReturn(throwable -> { 
     // Blah blah blah... 
     return model; 
    }) 
    .subscribeOn(Schedulers.immediate()); 
} 

Subscription subscription; 
subscription = observeGetAPI.subscribe(ModelA -> { 
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE... 
}, throwable ->{ 
    // WE NEVER GET HERE... onErrorResumeNext() 
}, 
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED } 
); 

Irgendwelche Ideen, was mache ich falsch?

Danke!

EDIT: Hier ist eine grobe Zeitleiste von dem, was passiert:

---> HTTP GET REQUEST B 
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS) 

---> HTTP GET REQUEST A 
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!) 

---> HTTP GET FALLBACK A 
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time. 
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS) 

Und hier ist ein Link zu einem einfachen Diagramm I, das ist gemacht darstellen, was ich passieren soll: Diagram

+0

Ihre Zeitleiste zeigt HTTP 200 für die Fehlerreaktion an. Gibt es eine andere Möglichkeit, einen Fehler von getObservableAPI2() zu signalisieren? Können Sie außerdem angeben, welche API-Anforderungen der Zeitleistenausgabe entsprechen? Es sieht aus wie getObservableAPI1-> REQUEST B, getObservableAPI2-> REQUEST A, getObservableAPI3-> FALLBACK A, aber ich möchte nur sicherstellen. – kjones

+0

Ja, obwohl die Antwort tatsächlich eine 200 ist, können einige Daten null sein, also werfe ich und Fehler in diesen Szenarien. Und ja, das ist die Timeline-Requests-Beziehung. Ich werde die Frage so schnell wie möglich bearbeiten, damit sie der Timeline-Anfrage entspricht. – mradzinski

+0

Ihre Logik sieht gut aus. Sie sollten die Fallback-Antwort vor onComplete erhalten. Können Sie alle subscribeOn() -Aufrufe entfernen und sehen, was passiert? Sie sollten nicht notwendig sein, da Retrofit die Anfragen ohnehin auf seinem eigenen Thread-Pool ausführt. – kjones

Antwort

5

Die Rx Im Folgenden verwendete Aufrufe sollten simulieren, was Sie mit Retrofit tun.

fallbackObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting A Fallback"); 
         subscriber.onNext("A Fallback"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Fallback Error"); 
         return "Fallback Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.immediate()); 

stringObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting B"); 
         subscriber.onNext("B"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .flatMap(new Func1<String, Observable<String>>() { 
        @Override 
        public Observable<String> call(String s) { 
         logger.v("flatMapping B"); 
         return Observable 
           .create(new Observable.OnSubscribe<String>() { 
            @Override 
            public void call(Subscriber<? super String> subscriber) { 
             logger.v("emitting A"); 
             subscriber.onNext("A"); 
             subscriber.onCompleted(); 
            } 
           }) 
           .delay(1, TimeUnit.SECONDS) 
           .map(new Func1<String, String>() { 
            @Override 
            public String call(String s) { 
             logger.v("A completes but contains invalid data - throwing error"); 
             throw new NotImplementedException("YUCK!"); 
            } 
           }) 
           .onErrorResumeNext(fallbackObservable) 
           .subscribeOn(Schedulers.newThread()); 
        } 
       }) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Return Error"); 
         return "Return Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.newThread()); 

subscription = stringObservable.subscribe(
     new Action1<String>() { 
      @Override 
      public void call(String s) { 
       logger.v("onNext " + s); 
      } 
     }, 
     new Action1<Throwable>() { 
      @Override 
      public void call(Throwable throwable) { 
       logger.v("onError"); 
      } 
     }, 
     new Action0() { 
      @Override 
      public void call() { 
       logger.v("onCompleted"); 
      } 
     }); 

Die Ausgabe aus den Log-Aussagen ist:

 
RxNewThreadScheduler-1 emitting B 
RxComputationThreadPool-1 flatMapping B 
RxNewThreadScheduler-2 emitting A 
RxComputationThreadPool-2 A completes but contains invalid data - throwing error 
RxComputationThreadPool-2 emitting A Fallback 
RxComputationThreadPool-1 onNext A Fallback 
RxComputationThreadPool-1 onCompleted 

Dies scheint wie das, was Sie suchen, aber vielleicht bin ich etwas fehlt.