Wenn wir die beobachtbare:Rx Programmierung, wie man Element mit früherem Element in single Observable kombinieren?
1 -> 2 -> 3 -> 4 -> 5 -> ...
wie man die neuen beobachtbar zu konstruieren:
(1, 2) -> (3, 4) -> ...
Vielleicht ist die Frage zu kurz, aber ich finde wirklich nicht, wie Sie achieve.Thank
Danke jeder, ich finde eine Methode und unter betracht entferne var
import java.util.concurrent.TimeUnit
import rx.lang.scala.{Subject, Observable}
import scala.concurrent.duration.Duration
object ObservableEx {
implicit class ObservableImpl[T](src: Observable[T]) {
/**
* 1 -> 2 -> 3 -> 4 ->...
* (1,2) -> (3,4) -> ...
*/
def pair: Observable[(T, T)] = {
val sub = Subject[(T, T)]()
var former: Option[T] = None //help me to kill it
src.subscribe(
x => {
if (former.isEmpty) {
former = Some(x)
}
else {
sub.onNext(former.get, x)
former = None
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
sub
}
}
}
object Test extends App {
import ObservableEx._
val pair = Observable.interval(Duration(1L, TimeUnit.SECONDS)).pair
pair.subscribe(x => println("1 - " + x))
pair.subscribe(x => println("2 - " + x))
Thread.currentThread().join()
}
Ich mochte Var überhaupt nicht, danke nochmal!
ENDLICH Ich bekomme einen leichten Weg, Hoffnung kann anderen helfen.
def pairPure[T](src: Observable[T]): Observable[(T, T)] = {
def pendingPair(former: Option[T], sub: Subject[(T, T)]): Unit = {
val p = Promise[Unit]
val subscription = src.subscribe(
x => {
if (former.isEmpty) {
p.trySuccess(Unit)
pendingPair(Some(x), sub)
}
else {
sub.onNext(former.get, x)
p.trySuccess(Unit)
pendingPair(None, sub)
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
p.future.map{x => subscription.unsubscribe()}
}
val sub = Subject[(T,T)]()
pendingPair(None, sub)
sub
}
Andere Antworten auch sehr hilfreich ~
Mögliches Duplikat von [Holen Sie das vorherige Element in IObservable, ohne die Sequenz neu zu bewerten] (http://stackoverflow.com/questions/2820685/get-previous-element-in-iobservable-without-re-evaluating-the- Sequenz) –
@KeithHall, Ich habe einen anderen ähnlichen Weg finden, indem Sie eine neue beobachtbare erstellen. "Scan" ist eine gute Idee, aber ich möchte auch meine Idee vervollständigen.Wenn Sie einen Rat haben, vielen Dank. – LoranceChen