2017-06-13 6 views
2

Ich habe durch die Beispiele in dem Buch Reactive Programming with RxJava gearbeitet, die auf Version 1 nicht gerichtet ist 2. Eine Einführung in unendliche Ströme hat das folgende Beispiel (und stellt fest, es gibt bessere Möglichkeiten, mit dem umzugehen concurrency):RxJava 2 äquivalent zu isUnsubscribe

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { 
    Runnabler =() -> { 
     BigInteger i = ZERO; 
     while (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(i); 
      i = i.add(ONE); 
     } 
    }; 
    new Thread(r).start(); 
}); 

... 

Subscription subscription = naturalNumbers.subscribe(x -> log(x)); 
/* after some time... */ 
subscription.unsubscribe(); 

jedoch in RxJava 2 ist der Lambda-Ausdruck zum create() Methode übergeben vom Typ ObservableEmitter und dies hat keinen isUnsubscribed() Methode. Ich habe in What's Different in 2.0 gesucht und auch eine Suche im Repository durchgeführt, kann aber keine solche Methode finden.

Wie würde diese gleiche Funktionalität in 2.0 erreicht werden?

Edited schließen Lösung wie unten angegeben (N. B. mit Kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter -> 
    Thread({ 
     var int: BigInteger = BigInteger.ZERO 
     while (!emitter.isDisposed) { 
      emitter.onNext(int) 
      int = int.add(BigInteger.ONE) 
     } 
    }).start() 
} 

val first = naturalNumbers.subscribe { log("First: $it") } 
val second = naturalNumbers.subscribe { log("Second: $it") } 

Thread.sleep(5) 
first.dispose() 
Thread.sleep(5) 
second.dispose() 

Antwort

2

Nachdem Sie auf beobachtbare abonnieren, Disposable zurückgegeben. Sie können es in Ihrer lokalen Variable speichern und disposable.isDisposed() überprüfen, um zu sehen, ob es immer noch abonniert oder nicht.

+1

Großartig, das hat den Trick gemacht. Vielen Dank. – amb85