2017-12-26 4 views
0

Lets sagen, dass ich ein Login und Benutzerdaten-Methode haben, HTTP-Aufrufe darstellt:RxJava: Wie kann ich eine mehrstufige Operation umbrechen, die den Schritt an Beobachter zurückgibt?

Single<LoginResponse> login(); 
Single<UserData> userData(); 

Ich brauche login() dann userData() zu nennen. Wenn beide erfolgreich sind, ist der Benutzer angemeldet.

Ich weiß, wie man sie in einem z. Completable:

Completable performLogin() { 
    login().doOnSuccess(this::storeLoginResponse) 
     .flatMap(userData()) 
     .doOnSuccess(this::storeUserData) 
     .doOnError(this::wipeLoginData) 
     .toCompletable(); 
} 

So dann die UI sagt

showLoading(); 
performLogin().subscribe(() -> { 
    stopLoading(); 
    onLoginSuccess(); 
}, error -> { 
    stopLoading(); 
    onLoginFailure(); 
}); 

Was passiert, wenn die Benutzeroberfläche, welche Stufe der Lade geschieht zeigen muss? Wie in, wenn der Anruf login() beendet wird und der userData() Anruf beginnt, wird es die Benutzeroberfläche ändern?

Was ich dachte, der ist so etwas wie

Observable<LoginStage> performLogin() { 
    return Observable.create(emitter -> { 
     login.doOnSuccess(response -> { 
      storeLoginResponse(response) 
      emitter.onNext(LOGIN_COMPLETE) 
     }).flatMap(userData()) 
     .subscribe(userData -> { 
      storeUserData(userData); 
      emitter.onNext(USER_DATA_COMPLETE) 
      emitter.onComplete(); 
     }, error -> { 
      wipeLoginData(); 
      emitter.onError(error); 
     }); 
    }); 
} 

Aber es fühlt sich an wie es ist ein netter oder Rx-y Weg, es zu tun.

Antwort

0

Sie können heiße Observable verwenden und eine Observable zu einem anderen Subjekt verketten und alle Elemente von einer Emission zu einer anderen weiterleiten, wenn Sie sie benötigen.

@Test 
public void chainObservablesWithSubject() throws InterruptedException { 
    Observable<Long> observable = Observable.from(Arrays.asList(1l, 2l, 3l, 4l)); 
    Subject<Long, Long> chainObservable = ReplaySubject.create(1); 
    observable.subscribe(chainObservable); 
    chainObservable.subscribe(System.out::println, (e) -> System.err.println(e.getMessage()), System.out::println); 
} 

Sie können weitere Beispiele hier überprüfen https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java

Verwandte Themen