2016-11-17 6 views
1

Ich benutze den Interval-Operator, und ich möchte weiterhin Elemente ausgeben, auch wenn eine Exception in meiner Pipeline passiert.Interval funktioniert weiter nach onErrorResumeNext

Also versuche ich onErrorResumeNext verwenden ein Element im Ausnahmefall emittieren. Aber ich habe gesehen, dass nach diesem Punkt das Intervall keine weiteren Elemente mehr emittiert.

Hier mein Gerät testen.

@Test 
public void testIntervalObservableWithError() { 
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .map(item -> item = null) 
      .map(String::toString) 
      .onErrorResumeNext(t-> Observable.just("item with error emitted")) 
      .subscribe(System.out::print, t->{ 
         System.out.println(t); 
        } 
        ); 
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription); 
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS); 
} 

Ich bin mit diesem Verhalten verwirren, warum die beobachtbare abmelden, wenn Es ist ein Element Empfang von onErrorResumeNext

LÖSUNG:

Nach einigen Erklärungen, merke ich, dass, wenn ein Fehler passiert die beobachtbaren t's komplett. Also schließe ich das Observable ein, das eine Ausnahme in eine andere Observable haben kann und ich benutze flatMap. Also, dann die Haupt Observable weiter emittieren Elemente.

@Test 
public void testIntervalObservableWithError() { 
    Observable.interval(100, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .flatMap(item -> Observable.just(item) 
        .map(String::toString)) 
      .subscribe(System.out::print); 
    TestSubscriber testSubscriber = new TestSubscriber(); 
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS); 
} 

Wenn wurde leider jeder Betreiber, der alle diese Magie tun kann, was ich wissen möchte.

Regrads

Antwort

1

Ihr Abonnement unterbricht, weil, wenn onErrorResumeNext ausgelöst wird, Upstream bereits mit einem Fehler beendet. Und Sie geben nur ein Element aus, statt dass die Ausnahme flussabwärts geht. Um den Upstream am Leben zu erhalten, müssen Sie verhindern, dass eine Ausnahme ausgelöst wird.

Für Ihre speziellen Beispiel Lösung kann so aussehen:

... 
    .map(time -> "item\n") 
    .map(item -> item = null) 
    .map(item -> { 
     try { 
      return item.toString(); 
     } catch (NullPointerException e) { 
      return "item with error emitted"; 
    }) 
    //no onErrorResumeNext() 
    .subscribe ... 

onErrorResumeNext ersetzt nur Fehler mit einem Element und ruft onComplete.

+0

Hallo, bitte lesen Sie mein Codebeispiel. Ich benutze onErrorResumeNext und es funktioniert nicht. – paul

+0

Ich habe zuerst Ihren Code gelesen. In Ihrem Fall ist es unmöglich, 'interval' am Leben zu halten, da Sie' .map (String :: toString) 'auf einem NULL-Element aufrufen, der Stream wird sofort mit einem Fehler beendet und dieser Fehler wird von' onResumeErrorNext() 'abgefangen sendet ein Element und stream ruft "onComplete" auf. Das ist die Antwort auf Ihre Frage ** Warum die beobachtbare abbestellen? **. Auch ich habe meine Antwort aktualisiert. –

+0

Und wieder aktualisiert. –

1

Der von RxJava verwendete Vertrag Strom ist, dass, sobald ein Fehler ausgegeben wird nicht mehr Produkte emittiert werden sollen. Wenn Ihr Anwendungsfall erfordert, dass der Stream nach dem Fehler fortgesetzt wird, müssen Sie den Fehler in eine onNext Emission umwandeln. Erstellen Sie ein Wrapper-Typ sagen ValueOrError<T> und beginnen in Bezug auf Observable<ValueOrError<T>> denken:

Observable<Integer> source = ... 
Observable<ValueOrError<Integer>> o = 
    source.map(x -> { 
    try { 
     return new ValueOrError<>(mightThrow(x)); 
    } 
    catch (Throwable e) { 
     return new ValueOrError<>(e); 
    });