Ich habe den folgenden Code:RxJava: beobachtbare und Standard-Thread
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
Und hier ist der Ausgang:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
Wichtiges Detail: Ich bin die subscribe
Methode von einem anderen Thread aufrufen (main
Faden in Android).
sieht also wie Observable
Klasse synchron und ist standardmäßig und führt alles (Operatoren wie map
+ anmelde Abonnenten) auf dem gleichen Thread, die Ereignisse emittiert (s.onNext
), nicht wahr? Ich frage mich ... ist es beabsichtigtes Verhalten oder ich habe gerade etwas falsch verstanden? Eigentlich habe ich erwartet, dass mindestens onNext
und onComplete
Callbacks auf dem Thread des Aufrufers aufgerufen werden, nicht auf dem einen, der Ereignisse ausgibt. Verstehe ich richtig, dass in diesem speziellen Fall der tatsächliche Thread des Aufrufers keine Rolle spielt? Zumindest wenn Ereignisse asynchron generiert werden.
Eine weitere Sorge - was passiert, wenn ich einige Observable als Parameter von einer externen Quelle (dh ich erstelle es nicht allein) ... es gibt keine Möglichkeit für mich als Benutzer zu prüfen, ob es ist synchron oder asynchron und ich muss nur explizit angeben, wo ich Callbacks über subscribeOn
und observeOn
Methoden erhalten möchte, oder?
Danke!