2015-06-13 14 views
8

Mein Anwendungsfall ist: Ich bekomme eine Liste von Permalinks und muss zwei REST-Anforderungen per Permalink ausgeben, um ihre Daten in Teilen zu erhalten. Wenn beide Anfragen zurück sind, möchte ich ihre Informationen zusammenführen und etwas damit machen (hier - drucken Sie es aus). Ich möchte es mit Code unter Verwendung des zip Operators tun. Hier ist mein aktueller Code (zusammen mit Mocks für die Bibliothek verwende ich):Fehlerbehandlung für gezippte Observables

public class Main { 

    public static void main(String[] args) { 
     ContentManager cm = new ContentManager(); 

     Observable 
       .from(cm.getPermalinks(10)) 
       .flatMap(permalink -> Observable.zip(
         Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         (dataContent, streamUrlContent) -> { 
          if (dataContent == null || streamUrlContent == null) { 
           System.err.println("not zipping " + dataContent + " and " + streamUrlContent); 
           return Observable.empty(); 
          } 

          return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
         })) 
       .subscribe(System.out::println); 
    } 
} 

class SubscribingRestCallback implements RestCallback { 

    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     System.err.println(message); 
     subscriber.onNext(null); 
     subscriber.onCompleted(); 
    } 
} 

public class Content { 

    public final String permalink; 

    public final String logoUrl; 

    public final String streamUrl; 

    public Content(String permalink, String logoUrl, String streamUrl) { 
     this.permalink = permalink; 
     this.logoUrl = logoUrl; 
     this.streamUrl = streamUrl; 
    } 

    @Override 
    public String toString() { 
     return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl); 
    } 
} 

public interface RestCallback { 

    void onSuccess(Content content); 

    void onFailure(int code, String message); 
} 

class ContentManager { 

    private final Random random = new Random(); 

    public List<String> getPermalinks(int n) { 
     List<String> permalinks = new ArrayList<>(n); 
     for (int i = 1; i <= n; ++i) { 
      permalinks.add("perma_" + i); 
     } 

     return permalinks; 
    } 

    public void getDataByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, false); 
    } 

    public void getStreamByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, true); 
    } 

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) { 
     // simulate network latency and unordered results 
     new Thread(() -> { 
      try { 
       Thread.sleep(random.nextInt(1000) + 200); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      if (random.nextInt(100) < 95) { 
       String logoUrl; 
       String streamUrl; 
       if (stream) { 
        logoUrl = null; 
        streamUrl = "http://" + permalink + "/stream"; 
       } else { 
        logoUrl = "http://" + permalink + "/logo.png"; 
        streamUrl = null; 
       } 
       callback.onSuccess(new Content(permalink, logoUrl, streamUrl)); 
      } else { 
       callback.onFailure(-1, permalink + " data failure"); 
      } 
     }).start(); 
    } 
} 

Im Allgemeinen ist es funktioniert, aber ich mag es nicht, den Fehler in dieser Implementierung der Handhabung. Grundsätzlich können die REST-Anforderungen fehlschlagen. In diesem Fall ruft die Methode onFailuresubscriber.onNext(null) auf, so dass die Methode zip immer etwas zu tun hat (eine Anforderung ist möglicherweise fehlgeschlagen, die andere möglicherweise nicht, und ich weiß nicht, welche fehlgeschlagen ist). Dann, in der zip Funktion brauche ich eine if, die überprüft, dass beide nicht null sind (mein Code wird abstürzen, wenn einer der Teil Content s ist null).

Ich würde gerne in der Lage sein, die null mit dem Operator filter irgendwo, wenn möglich zu filtern. Oder gibt es einen besseren Weg als null Werte für den Fehlerfall zu senden, aber so dass es immer noch mit der zip Funktion funktioniert?

+0

Warum nennst du 'subscriber.onNext (null)' 'von onFailure' Methode? In diesem Fall soll 'subscriber.onError (throwable)' aufgerufen werden. –

+0

Da 'onError()' 'den gesamten Stream ausfällt, will ich das nicht. Ich möchte so viele Ergebnisse wie möglich sammeln und die Fehler im Grunde ignorieren/ausfiltern. Oder liege ich falsch? – wujek

+0

Auch wenn Sie nicht möchten, dass der gesamte Stream fehlschlägt, müssen Sie immer noch die 'subscriber.onError()' Methode aufrufen. Es gibt einige andere Möglichkeiten, die Fehler zu beheben. Einer von ihnen ist ein "onErrorResumeNext" -Operator. Hier ist ein Beispiel wie man es benutzt - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –

Antwort

5

Zunächst einmal der richtige Weg, um eine Subscriber über einen Fehler zu benachrichtigen ist subscriber.onError Methode aufzurufen:

class SubscribingRestCallback implements RestCallback { 
    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     subscriber.onError(new Exception(message)); 
    } 
} 

Auch wenn Sie scheitern nicht der ganze Strom wollen, können Sie immer noch ein Bedarf anrufen subscriber.onError() Methode. Es gibt einige andere Möglichkeiten, die Fehler zu beheben. Einer von ihnen ist ein onErrorResumeNext Betreiber:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 

EDIT

ich noch eine letzte Frage: Wenn Sie meinen Reißverschluss Funktionen bemerken, ich Observable.empty() zurückgeben, wenn die beiden Objekte nicht gezippt werden, und sobald ich Inhalt zurückgeben. Das scheint falsch zu sein. Wie sollte ich mit solchen Fehler Bedingungen in der Reißverschlussfunktion umgehen?

Ja, die Rückkehr Observable.empty() ist völlig falsch. Werfen scheint eine Ausnahme von zip Funktion wie die beste Lösung:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        if (!isDataValid(dataContent, streamUrlContent)) { 
         throw new RuntimeException("Something went wrong."); 
        } 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 
+0

Was ich schließlich tat war: die Methoden 'RestCallback # onFailure()' rufen 'subscriber.onError()' auf und im 'RestCallback # onSuccess()' rufe ich 'onNext() 'und' onComplete() 'if auf der Inhalt ist gültig, oder 'onError()' wenn nicht. Dann behandelt das 'onResumeNext()', das auf das gezippte 'Observable' angewendet wird, auch diese Art von Inhaltsfehlern und meine Zipping-Funktion ist so einfach wie' (dataContent, streamUrlContent) -> neuer Inhalt (dataContent.permalink, dataContent. logoUrl, streamUrlContent.streamUrl)) '. Ich habe den Aufruf 'doOnError()' hinzugefügt, um zusätzlich zu protokollieren. Ich mag den Code jetzt. Danke für Ihre Hilfe. – wujek