2015-02-08 5 views
5

Wenn ich ein Observalbe haben:RxJava Observable und Abonnent für das Überspringen von Ausnahmen?

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4); 
Observable<Integer> o1 = Observable.from(ints); 

Ich möchte eine andere beobachtbar erzeugen, die Kluft von 12:

Observable<Integer> o2 = o1.map(i -> 12/i); 
o2.subscribe(
    v -> logger.info ("Subscriber value {}", v) , 
    t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage()) 
); 

Es ist offensichtlich, es eine Fehlermeldung anzeigt, und gestoppt, wenn sie begegnen ‚0‘:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber onError class java.lang.ArithmeticException :/by zero 

Aber was, wenn ich möchte, dass der Observer (o2) die Ausnahme auslässt?

Ich sehe in RxJava Doc über error handling, gibt es keine Möglichkeit, den Fehler zu überspringen. Die onErrorResumeNext() und onExceptionResumeNext() benötigt eine Backup/FallbackObservable, die nicht was ich will. Der onErrorReturn muss den Rückgabewert angeben.

Alle drei Fehlerbehandlungsmethoden können den ursprünglichen Beobachter nicht fortsetzen. zum Beispiel:

Observable<Integer> o2 = o1.map(i -> 12/i) 
     .onErrorReturn(t -> 0); 

Er druckt:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber value 0 

Nicht

die einzige Lösung, um den Rest 12/3 und 12/4 Druck scheint in der map Funktion Relais:

Observable<Integer> o2 = o1.map(i -> { 
    try { 
    return Optional.of(12/i); 
    } catch (ArithmeticException e) { 
    return Optional.empty(); 
    } 
}).filter(Optional::isPresent) 
    .map(o -> (Integer) o.get()); 

Es funktioniert, aber es ist umständlich. Ich frage mich, ob es eine Möglichkeit ist leicht jeden RuntimeException zu überspringen, wenn die Manipulation Observable (wie map)

Die oben über Ausnahme in Observable Skipping ist. Im Folgenden geht es um das Überspringen Ausnahme in der Subscriber:

Die Situation ist die gleiche:

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4); 
Observable<Integer> o1 = Observable.from(ints); 
o1.subscribe(
    i -> logger.info("12/{} = {}", i, 12/i), 
    t -> logger.error("{} : {}", t.getClass() , t.getMessage()), 
() -> logger.info("onCompleted") 
); 

Es druckt:

Wenn Ausnahme in onNext auftritt, löst es onError und NOT REAKTION auf Daten von Observable. Wenn ich möchte, dass der Abonnent die Ausnahme verschluckt, muss ich versuchen, die ArithmeticException in der onNext() zu fangen. Gibt es eine sauberere Lösung?

Es scheint, wenn ein Subscriber mit einem Fehler in der onNext() konfrontiert ist, die innerhalb (onNext) nicht behandelt werden kann, soll es aufhören, oder? Ist es ein gutes Design?

+1

So wie ich es betrachten, 'OnNext()' entspricht 'Iterator.next()' - und wenn ein Problem eine Sammlung gibt es Iterieren - eine Ausnahme ausgelöst wird und der Iterator zieht mit (es doesn 't "resume" iterating) - siehe zB [ConcurrentModificationException] (http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html). Das ist die gleiche Art von Verhalten, die wir hier haben. Das heißt, @benjchristensen könnte mehr Licht auf das Thema werfen. – alfasin

+0

Das 'Optional'-Mapping ist wahrscheinlich meine bevorzugte Art, dies zu handhaben. Es ist ziemlich klar, und der Filter isst Ereignisse schön. Es ist eine ziemlich gute Möglichkeit, es zu kapseln. Ich hätte in Scala einen "Try" -Typ verwendet und danach gefiltert. – BeepDog

Antwort

1

Versuchen Sie diesen Weg.

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty()); 

Im obigen Code werden Fehlerereignisse durch einen leeren Stream ersetzt und der ursprüngliche Stream wird fortgesetzt. Folglich werden Fehler übersprungen.

+0

Entschuldigung, es gibt keine 'Observable.onErrorFlatMap()' in RxJava 1.0 ... – smallufo

+0

Oh, Entschuldigung, ich hatte die Version nicht bemerkt. Aber es ist möglich, einen "onErrorFlatMap" -ähnlichen Kombinator zu implementieren, der 'materialize' und' dematerialize' verwendet, obwohl ein wenig schwierig. – findall

+0

Ich bin auch neugierig, warum solche Methode in 1.0 verschwinden. – smallufo

5
Observable.just(1, 2, 3, 0, 4) 
      .flatMap(i -> Observable.defer(() -> Observable.just(12/i)) 
        .onErrorResumeNext(Observable.just(0))); 

es ein Weg, obwohl zu gehen, bedenken Sie, dass RxJava geht davon aus, dass Fehler etwas ist wirklich unexcpeted (können Sie Werte erwarten 0 sein). Auf der anderen Seite, wenn Sie Division durch 0 Ausnahme ignorieren möchten, sollten Sie Ihre Werte vor der Division filtern/zuordnen.

+0

Die einzige Sache, die ich hier ändern würde, ist 'Observable.from (1, 2, 3, 0, 4)' zu 'Observable.just (1, 2, 3, 0, 4)', weil in 1.0.0 es würde nicht mit 'from' kompilieren. – meddle

0
import rx.lang.scala._ 

import scala.concurrent.duration.DurationDouble 
import scala.util.{Failure, Success, Try} 

val o1 = List(1, 2, 3, 0, 4) toObservable 
val o2 = List(1, 2, 3, 4, 5) toObservable 

val o3 = o1 zip o2 map {case (i1, i2) => i2/i1 } // this will trigger a division by 0 error 

val o4 = o3 lift { 
     subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) { 
     override def onNext(v: Int) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Success(v)) 
     } 

     override def onError(e: Throwable) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Failure(e)) 
     } 

     override def onCompleted() { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onCompleted() 
     } 
     } 
    } 

o4 subscribe(println(_)) 
Verwandte Themen