2016-09-02 4 views
2

Ich habe ein Problem bei der Verwendung des RxJava Concat-Operators. Ich habe zwei Observablen, die ersten aussendet Ergebnisse aus einer Server-Datenbank und die andere emittiert Ergebnisse aus der lokalen Datenbank, und dann concat ich das:Realm + Retrofit + RxJava: Concat und SubscribeOn

// Uses a Realm in the UI thread 
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId); 

// Uses Retrofit 
Observable<MyResult> localObservable = mLocalDataSource.find(tId); 

Observable.concat(localObservable, remoteObservable) 
    .doOnNext(result -> /* Do my stuff */) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .doOnError(throwable -> throwable.printStackTrace()) 
    .subscribe() 

Also das macht mir Problem, da ich nicht subscribeOn() bin mit dem verkettet Observable läuft auf AndroidScheduler.MainThread() und dies führt nicht die Fernbedienung und es startet eine NetworkOnMainThreadException.

Wenn ich eine subscribeOn(Schedulers.computation()) implementieren, bekomme ich , da natürlich die Observable nicht auf dem Thread läuft die Realm-Instanz existiert.

Ich habe in anderen Fragen gesucht und ich habe nichts nützliches bekommen, ich habe das Beispiel von Realm überprüft: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java aber seltsamerweise sehe ich, dass die Retrofit Observable auf nichts abonniert ist und es funktioniert.

Warum funktioniert es auf der Probe und in meinem Code kann ich nicht das gleiche tun? Irgendein Vorschlag?

+0

... ist es wirklich Ihre "lokale" Observable, die Retrofit verwendet? – EpicPandaForce

Antwort

2

Ich glaube, Sie sollten subscribeOn() an den richtigen Stellen verwenden.

// Uses a Realm in the UI thread 
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread()); 

// Uses Retrofit 
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io()); 

Observable.concat(realmObservable, retrofitObservable) 
    .doOnNext(result -> /* Do my stuff */) 
    .subscribeOn(AndroidSchedulers.mainThread()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .doOnError(throwable -> throwable.printStackTrace()) 
    .subscribe() 

Sehen Sie, ob es Ihr Problem behebt.

+0

Danke dafür, ich habe es auch angenommen und es versucht, aber das Verhalten meiner App wurde seltsam und nur das erste Observable im Concat funktionierte. Ich bin mir sicher, dass ich etwas anderes gemacht habe, aber ich habe bereits meine ursprüngliche Implementierung geändert. Trotzdem Danke. –

2

Sie können Ihre lokalen und Remote-Observablen concat wie unten:

// Uses a Realm in the UI thread 
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId); 

// Uses Retrofit 
Observable<MyResult> localObservable = mLocalDataSource.find(tId); 

Observable.concat(localObservable, remoteObservable).first() 
       .map(new Func1<MyResult, MyResult>() { 
        @Override 
        public myResult call(MyResult result) { 
         if (result == null) { 
          throw new IllegalArgumentException(); 
         } 
         return result; 
        } 
       }); 

und abonnieren wie unten:

CompositeSubscription mCompositeSubscription = new CompositeSubscription(); 
final Subscription subscription = mRepo.find(tId 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Observer<MyResult>() { 
        @Override 
        public void onCompleted() { 
         // Completed 
        } 

        @Override 
        public void onError(Throwable e) { 
         // onError 
        } 

        @Override 
        public void onNext(MyResult result) { 
         //onSuccess 
        } 
       }); 
mCompositeSubscription.add(subscription); 

Sie dieses Repo überprüfen können für RxJava + Retrofit + Realm https://github.com/savepopulation/wikilight

Viel Glück!

Verwandte Themen