Ich habe durch die Beispiele in dem Buch Reactive Programming with RxJava gearbeitet, die auf Version 1 nicht gerichtet ist 2. Eine Einführung in unendliche Ströme hat das folgende Beispiel (und stellt fest, es gibt bessere Möglichkeiten, mit dem umzugehen concurrency):RxJava 2 äquivalent zu isUnsubscribe
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler =() -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
jedoch in RxJava 2 ist der Lambda-Ausdruck zum create()
Methode übergeben vom Typ ObservableEmitter
und dies hat keinen isUnsubscribed()
Methode. Ich habe in What's Different in 2.0 gesucht und auch eine Suche im Repository durchgeführt, kann aber keine solche Methode finden.
Wie würde diese gleiche Funktionalität in 2.0 erreicht werden?
Edited schließen Lösung wie unten angegeben (N. B. mit Kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
Großartig, das hat den Trick gemacht. Vielen Dank. – amb85