Mein Anwendungsfall ist: Ich bekomme eine Liste von Permalinks und muss zwei REST-Anforderungen per Permalink ausgeben, um ihre Daten in Teilen zu erhalten. Wenn beide Anfragen zurück sind, möchte ich ihre Informationen zusammenführen und etwas damit machen (hier - drucken Sie es aus). Ich möchte es mit Code unter Verwendung des zip
Operators tun. Hier ist mein aktueller Code (zusammen mit Mocks für die Bibliothek verwende ich):Fehlerbehandlung für gezippte Observables
public class Main {
public static void main(String[] args) {
ContentManager cm = new ContentManager();
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}
public class Content {
public final String permalink;
public final String logoUrl;
public final String streamUrl;
public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}
@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}
public interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}
return permalinks;
}
public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}
public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}
private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}
Im Allgemeinen ist es funktioniert, aber ich mag es nicht, den Fehler in dieser Implementierung der Handhabung. Grundsätzlich können die REST-Anforderungen fehlschlagen. In diesem Fall ruft die Methode onFailure
subscriber.onNext(null)
auf, so dass die Methode zip
immer etwas zu tun hat (eine Anforderung ist möglicherweise fehlgeschlagen, die andere möglicherweise nicht, und ich weiß nicht, welche fehlgeschlagen ist). Dann, in der zip
Funktion brauche ich eine if
, die überprüft, dass beide nicht null
sind (mein Code wird abstürzen, wenn einer der Teil Content
s ist null
).
Ich würde gerne in der Lage sein, die null
mit dem Operator filter
irgendwo, wenn möglich zu filtern. Oder gibt es einen besseren Weg als null
Werte für den Fehlerfall zu senden, aber so dass es immer noch mit der zip
Funktion funktioniert?
Warum nennst du 'subscriber.onNext (null)' 'von onFailure' Methode? In diesem Fall soll 'subscriber.onError (throwable)' aufgerufen werden. –
Da 'onError()' 'den gesamten Stream ausfällt, will ich das nicht. Ich möchte so viele Ergebnisse wie möglich sammeln und die Fehler im Grunde ignorieren/ausfiltern. Oder liege ich falsch? – wujek
Auch wenn Sie nicht möchten, dass der gesamte Stream fehlschlägt, müssen Sie immer noch die 'subscriber.onError()' Methode aufrufen. Es gibt einige andere Möglichkeiten, die Fehler zu beheben. Einer von ihnen ist ein "onErrorResumeNext" -Operator. Hier ist ein Beispiel wie man es benutzt - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –