2015-02-23 9 views
6

Ich möchte RxJava verwenden, um Daten von einem Web-Service (über Retrofit) zu laden. Ich habe auch einen Datenbankcache von vorherigen Ergebnissen.Android + RxJava - Laden von Daten aus db UND Web-Service

Nehmen wir an, ich schon Observablen für jeden von ihnen haben:

Observable<List<MyModel>> networkObservable = retrofitService.getModels(); 
Observable<List<MyModel>> dbObservable = database.getModels(); 

Ich möchte diese 2 Observablen zu einem verschmelzen:

public class MyModelHelper { 
    public static Observable<List<MyModel>> getModels() { 
     // TODO: Help! 
    } 
} 

Das Verhalten Ich möchte nur für Abonnenten, die Datenbank zu erhalten Ergebnisse sobald verfügbar, gefolgt von den restService-Ergebnissen, wenn sie eingehen (vorausgesetzt, das Abrufen von der Datenbank ist schneller als der Netzwerkanruf)

Beste, was ich tun konnte, auf mein eigenes ist:

public class MyModelHelper { 
    public static Observable<List<MyModel>> getModels() { 
     List<MyModel> emptyList = new LinkedList<>(); 

     // 'startWith' because combineLatest wont call back until all source observables emit something 
     Observable.combineLatest(dbObservable.startWith(emptyList), 
      networkObservable.startWith(emptyList), 
      new Func2<List<MyModel>, List<MyModel>, List<MyModel>>() { 
       @Override 
       public List<MyModel> call(List<MyModel> first, List<MyModel> second) { 
        return merge(first, second); 
      } 
     }); 
    } 
} 

Dieses ein wenig hacky mir scheint, und ich fühle mich für ein solches gemeinsames Szenario muss es eine bessere Lösung.

Es wäre auch schön, wenn ein Fehler im Netzwerk beobachtbar wäre, dass die db-Ergebnisse immer noch durchlaufen würden. Ich könnte onErrorResumeNext() anrufen und das dbObservable selbst zurückgeben, aber ich möchte noch Abonnenten benachrichtigt werden, dass ein Fehler aufgetreten ist.

Irgendwelche Vorschläge?

Antwort

13

Verwenden Sie Observable.merge direkt. Es führt mehrere beobachtbare Streams zu einem zusammen. Wenn die Datenbank also schneller sendet, erhalten Sie sie zuerst.

public static Observable<List<MyModel>> getModels() { 
    return Observable.merge(dbObservable, networkObservable); 
} 
+1

Nun, das schien fast peinlich offensichtlich! Ich habe 'mergeDelayError()' verwendet, um sicherzustellen, dass beide Observablen eine Chance haben, zurückzukehren. Vielen Dank! – DanielGrech

+5

Darüber hinaus hilft 'mergeDelayError' Ihnen, das gewünschte Fehlerverhalten zu erhalten. Wenn einer der Streams einen Fehler aufweist, wirkt sich dies nicht auf den anderen aus, bis er abgeschlossen ist. Danach können Sie den Fehler mit dem gewünschten Fehleroperator beheben. – lopar

+0

@lopar sehr guter Punkt. Es lohnt sich auch, das Marmordiagramm für 'merge' und' mergeDelayError' zu betrachten, um es richtig zu verstehen http://reactivex.io/documentation/operators/merge.html – tomrozb