2016-10-20 1 views
1

Ich habe Observable.fromEmitter() als eine fantastische Alternative zu Observable.create(). Ich bin in letzter Zeit in ein seltsames Verhalten geraten und kann nicht recht herausfinden, warum das so ist. Ich würde wirklich jemanden mit etwas Wissen über Gegendruck und Scheduler schätzen, die sich das ansehen.RxJava Observable.fromEmitter ungerade Gegendruck Verhalten

public final class EmitterTest { 
    public static void main(String[] args) { 
    Observable<Integer> obs = Observable.fromEmitter(emitter -> { 
     for (int i = 1; i < 1000; i++) { 
     if (i % 5 == 0) { 
      sleep(300L); 
     } 

     emitter.onNext(i); 
     } 

     emitter.onCompleted(); 
    }, Emitter.BackpressureMode.LATEST); 

    obs.subscribeOn(Schedulers.computation()) 
     .observeOn(Schedulers.computation()) 
     .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128" 

    sleep(10000L); 
    } 

    private static void sleep(Long duration) { 
    try { 
     Thread.sleep(duration); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
    } 
} 

Der Ausgang dieser Anwendung ist

Received 1 
Received 2 
... 
Received 128 

Dann es bei 128 stecken bleibt (assumedly weil diese Standardpuffergröße des RxJava ist). Wenn ich den in fromEmitter() angegebenen Modus zu BackpressureMode.NONE ändere, funktioniert der Code wie vorgesehen. Wenn ich den Anruf zu observeOn() entferne, funktioniert es auch wie beabsichtigt. Kann jemand erklären, warum das so ist?

+0

Das ist seltsam, es überhaupt nicht aufhören sollte. Es stoppt, auch wenn toBlocking() oder eine kleinere Puffergröße in observeOn verwendet wird. Ich werde das weiter untersuchen. – akarnokd

+0

Was entspricht Observable.fromEmitter in RX Java 2.0? – Mike6679

+1

'Observable.create()' ersetzt 'fromEmitter()' in 2.0. Wenn Sie das alte, gruselige Erstellungsverhalten verwenden möchten, verwenden Sie 'Observable.unsafeCreate()'. –

Antwort

2

Dies ist eine gleich Pool Deadlock-Situation. subscribeOn plant den Downstream request auf dem gleichen Thread, den es verwendet, aber wenn dieser Thread mit einer Schlaf-/Emissionsschleife beschäftigt ist, wird die Anfrage nie an fromEmitter geliefert und somit beginnt nach einigen Zeit LATEST Elemente bis zum Ende fallen zu lassen, wo die sehr Der letzte Wert (999) wird ausgegeben, wenn die Hauptquelle lange genug wartet. (Dies ist eine ähnliche Situation mit onBackpressureBlock, die wir entfernt haben.)

Wenn subscribeOn diese Terminierung von Anforderungen nicht ausgeführt hat, würde das Beispiel propery funktionieren.

habe ich an issue geöffnet, um die Lösungen zu erarbeiten.

Die Abhilfe für jetzt größere Puffergröße zu verwenden, mit observeOn (Überlast ist), oder verwenden fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

+0

Danke für die knappe Antwort, David! Bin dankbar. –

1

Das ist nicht merkwürdig, dies zu erwarten ist.

Lasst uns die Anrufe verfolgen. Beginnen Sie mit:

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

und so weiter. Schauen Sie in den Konstruktor:

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
    this.scheduler = scheduler; 
    this.delayError = delayError; 
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
} 

Wenn Sie nicht über den Puffer angeben, der Standard ist RxRingBuffer.SIZE, welche Größe Plattform abhängig ist.

Aus diesem Grund, wenn Sie observeOn Operator ohne Puffergröße aufrufen, die Standard-128 (16 auf Android).

Die Lösung für dieses Problem ist sehr einfach: nur einen weiteren observeOn Operator und Puffergröße erklären. Aber wenn Sie die Puffergröße für 1000 (so viele wie vom Emitter kommende Elemente) deklarieren, wird das Programm immer noch beendet, ohne alle Werte zu emittieren (ungefähr 170). Warum? Weil das Programm endet. Der Haupt-Thread endet nach 10 000 Sekunden und die Berechnung erfolgt in einem anderen Thread (Schedulers.computation()). Die Lösung dafür? Verwenden Sie CountdownLatch. Seien Sie sich bewusst, verwenden Sie es nie Produktion, es ist nur für Tests helphul.

+0

Danke für die Antwort! Letztendlich habe ich Davids Antwort vorgezogen, aber ich weiß es zu schätzen, dass Sie sich bemüht haben, die Anrufe zu analysieren. –

Verwandte Themen