2017-08-17 1 views
0

Neu bei RxJava und Reaktive Programmierung sozusagen.Rx Java Karte funktioniert parallel

Ich versuche, zwei Funktionen parallel als Teil einer einzigen Observable-Pipeline abzubilden, aber das scheint nicht so zu funktionieren. Hier ist mein Code.

Observable.fromCallable(thatReturnsNumberOne()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .subscribe(testSubscriber); 

Ich möchte die 2 doubleIt() -Aufrufe zur gleichen Zeit erzeugt werden. Aber es scheint immer so zu sein, dass sobald das erste doubleIt() beendet ist, erst dann das zweite beginnt. dh blockierend/sequentiell.

Was fehlt mir?

Antwort

1

Ich nehme an, thatReturnsNumberOne() gibt nur einen einzelnen Wert zurück. Der zurückgegebene Wert wird nacheinander an die einzelnen Operatoren übergeben. Indem Sie observeOn(newThread()) verwenden, ändern Sie nur zu einem neuen Thread, wenn der Wert zu diesem Punkt in der Kette gelangt.

Wenn Sie tun Berechnungen parallel wollen, müssen Sie mehrere Observablen verwenden:

Observable.fromCallable(thatReturnsNumberOne()) 
    .flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()) 
     .combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()), 
     doubles -> doubles[0] + doubles[1])) 
    .subscribe(testSubscriber);