0

Ich bin neu bei RxJava. Ich habe ein paar Jersey RxJava Kunden, die Observables zurückgeben. Ich muss einen Anruf tätigen, um einige Daten zu erhalten, diese Daten werden die Eingabe für meine nächsten 3 Anrufe. Ich möchte, dass diese Anrufe parallel gemacht werden. Schließlich möchte ich eine Berechnung durchführen, sobald alle Anrufe abgeschlossen sind, die alle Daten erfordert. Hier ist, wie es aussieht:Wie kann ich mehrere beobachtbare Aufrufe asynchron aufrufen, aber gleichzeitig vor und nach diesen Aufrufen einige Berechnungen durchführen?

interface Service { 
    Observable<ResultA> callServiceA(InitialData input); 
    Observable<ResultB> callServiceB(ResultA resultA); 
    Observable<ResultC> callServiceC(ResultA resultA); 
    Observable<ResultD> callServiceD(ResultA resultA); 
    FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d); 
} 

class MyClass{ 

    @Autowired 
    ExecutorService myExecutorService; 

    Observable<FinalResult> myMethod(InitialData initialData){ 
    /* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */ 
    } 
} 

Antwort

0

Sie flatMap verwenden können, einen synchronen Aufruf zu tun, dann zip verwenden oder fusionieren mehrere Anrufe auszusenden, dann flatMap, dass wieder, wenn Sie fertig.

+1

Können Sie mir geben Beispielcode? – Adam

4

flatMap() und zip() sind Ihre Freunde in dieser Situation.

Observable<FinalResult> myMethod(InitialData initialData) { 
    return service 
      .callServiceA(initialData) 
      .flatMap(resultA -> Observable.zip(
        service.callServiceB(resultA), 
        service.callServiceC(resultA), 
        service.callServiceD(resultA), 
        (resultB, resultC, resultD) -> 
         service.simpleCalculation(resultA, resultB, resultC, resultD)) 
      ); 
} 

Mit der Rückkehr beobachtbar wird wie folgt aussehen:

Subscription subscription = 
     myMethod(new InitialData()) 
       .subscribe(finalResult -> { 
          // FinalResult will end up here. 
         }, 
         throwable -> { 
          // Handle all errors here. 
         }); 
+0

Danke! Wie kann ich die Dienste B, C und D parallel auf verschiedenen Threads aufrufen lassen? Muss ich keinen Scheduler hinzufügen? – Adam

+0

So viele Leute verwenden Retrofit für ihre Netzwerkanrufe, die Anrufe automatisch parallel planen. Wenn Ihr Dienst diese synchron ausführt, fügen Sie für jeden Aufruf .subscribeOn (Schedulers.io()) hinzu. – kjones

+0

Ich verwende kein Retrofit. Ich benutze Jersey Client. In der Vergangenheit habe ich .observeOn (Schedulers.from (executorService)) am Ende der Kette hinzugefügt. Wie unterscheidet sich das von .subscribeOn? – Adam

Verwandte Themen