Ich habe Observable.fromEmitter()
als eine fantastische Alternative zu Observable.create()
. Ich bin in letzter Zeit in ein seltsames Verhalten geraten und kann nicht recht herausfinden, warum das so ist. Ich würde wirklich jemanden mit etwas Wissen über Gegendruck und Scheduler schätzen, die sich das ansehen.RxJava Observable.fromEmitter ungerade Gegendruck Verhalten
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Der Ausgang dieser Anwendung ist
Received 1
Received 2
...
Received 128
Dann es bei 128 stecken bleibt (assumedly weil diese Standardpuffergröße des RxJava ist). Wenn ich den in fromEmitter()
angegebenen Modus zu BackpressureMode.NONE
ändere, funktioniert der Code wie vorgesehen. Wenn ich den Anruf zu observeOn()
entferne, funktioniert es auch wie beabsichtigt. Kann jemand erklären, warum das so ist?
Das ist seltsam, es überhaupt nicht aufhören sollte. Es stoppt, auch wenn toBlocking() oder eine kleinere Puffergröße in observeOn verwendet wird. Ich werde das weiter untersuchen. – akarnokd
Was entspricht Observable.fromEmitter in RX Java 2.0? – Mike6679
'Observable.create()' ersetzt 'fromEmitter()' in 2.0. Wenn Sie das alte, gruselige Erstellungsverhalten verwenden möchten, verwenden Sie 'Observable.unsafeCreate()'. –