2016-06-06 10 views
0

Wenn wir die beobachtbare:Rx Programmierung, wie man Element mit früherem Element in single Observable kombinieren?

1 -> 2 -> 3 -> 4 -> 5 -> ... 

wie man die neuen beobachtbar zu konstruieren:

(1, 2) -> (3, 4) -> ... 

Vielleicht ist die Frage zu kurz, aber ich finde wirklich nicht, wie Sie achieve.Thank

Danke jeder, ich finde eine Methode und unter betracht entferne var

import java.util.concurrent.TimeUnit 

import rx.lang.scala.{Subject, Observable} 

import scala.concurrent.duration.Duration 
object ObservableEx { 
    implicit class ObservableImpl[T](src: Observable[T]) { 
    /** 
     * 1 -> 2 -> 3 -> 4 ->... 
     * (1,2) -> (3,4) -> ... 
     */ 
    def pair: Observable[(T, T)] = { 
     val sub = Subject[(T, T)]() 
     var former: Option[T] = None //help me to kill it 
     src.subscribe(
     x => { 
      if (former.isEmpty) { 
      former = Some(x) 
      } 
      else { 
      sub.onNext(former.get, x) 
      former = None 
      } 
     }, 
     e => sub.onError(e), 
     () => sub.onCompleted() 
     ) 

     sub 
    } 
    } 
} 

object Test extends App { 
    import ObservableEx._ 
    val pair = Observable.interval(Duration(1L, TimeUnit.SECONDS)).pair 
    pair.subscribe(x => println("1 - " + x)) 
    pair.subscribe(x => println("2 - " + x)) 

    Thread.currentThread().join() 
} 

Ich mochte Var überhaupt nicht, danke nochmal!

ENDLICH Ich bekomme einen leichten Weg, Hoffnung kann anderen helfen.

def pairPure[T](src: Observable[T]): Observable[(T, T)] = { 
    def pendingPair(former: Option[T], sub: Subject[(T, T)]): Unit = { 
    val p = Promise[Unit] 
    val subscription = src.subscribe(
     x => { 
     if (former.isEmpty) { 
      p.trySuccess(Unit) 
      pendingPair(Some(x), sub) 
     } 
     else { 
      sub.onNext(former.get, x) 
      p.trySuccess(Unit) 
      pendingPair(None, sub) 
     } 
     }, 
     e => sub.onError(e), 
     () => sub.onCompleted() 
    ) 

    p.future.map{x => subscription.unsubscribe()} 
    } 

    val sub = Subject[(T,T)]() 
    pendingPair(None, sub) 
    sub 
} 

Andere Antworten auch sehr hilfreich ~

+0

Mögliches Duplikat von [Holen Sie das vorherige Element in IObservable, ohne die Sequenz neu zu bewerten] (http://stackoverflow.com/questions/2820685/get-previous-element-in-iobservable-without-re-evaluating-the- Sequenz) –

+0

@KeithHall, Ich habe einen anderen ähnlichen Weg finden, indem Sie eine neue beobachtbare erstellen. "Scan" ist eine gute Idee, aber ich möchte auch meine Idee vervollständigen.Wenn Sie einen Rat haben, vielen Dank. – LoranceChen

Antwort

0

Sie können tumblingBuffer mit count = 2 verwenden eine Observable von Seq s der Länge 2 und unter Verwendung von map zu bekommen, können Sie sie in Paare drehen:

implicit class ObservableImpl[T](src: Observable[T]) { 
    def pair: Observable[(T, T)] = { 
    def seqToPair(seq: Seq[T]): (T, T) = seq match { 
     case Seq(first, second) => (first, second) 
    } 
    src.tumblingBuffer(2).map(seqToPair) 
    } 
} 

Beachten Sie, dass dies, wenn die Anzahl der Elemente wird scheitern in die Quelle Observable ist ungerade, so dass Sie diesen Fall in seqToPair abdecken müssen.

+0

Ich bekomme auch einen leichten Weg.Ihr ist auch toll. – LoranceChen

0

Versuchen groupBy Operator. Viel Glück.

+0

Hallo, mein Freund, kann Observable [T] nicht zu Observable [(T, T)] machen? – LoranceChen

Verwandte Themen