2017-08-21 5 views
2

Ich habe eine Observable, die Daten aussendet und ich möchte es zunächst für drei Sekunden puffern und dann muss es eine Verschiebung von einer Sekunde nach dem Anfangspuffer geben. Dies ist eher wie buffer(timespan,unit,skip), wo der Skip auf der Zeitspanne ist.RxJava Sliding Window

Probe:

ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2) 

ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90} 

Ich kann dies erreichen, indem Sie einen benutzerdefinierten Betreiber zu schaffen. Ich möchte nur wissen, gibt es einen Weg, es zu tun, ohne auf den benutzerdefinierten Betreiber angewiesen zu sein.

Antwort

0

Ich denke, dass es mit eingebauten Operatoren gelöst werden kann. Code demonstriert einen der Ansätze, obwohl Dinge kann schwierig werden, wenn für heiße oder nicht leichten Kältequellen verwendet - ich Sie ermutigen, es zu benutzen für Bildungs ​​/ get-ein-Idee Zwecke und nicht die Produktion verwenden

@Test 
fun slidingWindow() { 
    val events = Flowable.just(
      Data(5, 1.0), 
      Data(10, 1.5), 
      Data(30, 2.8), 
      Data(40, 3.2), 
      Data(60, 3.8), 
      Data(90, 4.2)) 
      .observeOn(Schedulers.io()) 
    val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet() 
    Assert.assertNotNull(windows) 
    Assert.assertFalse(windows.isEmpty()) 
} 

fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
     from = 0, 
     to = windowSize, 
     slideSize = slideSize, 
     data = data) 

fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> { 
    val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data } 
    val tail = data.skipWhile { it.time <= from + slideSize } 
    val nonEmptyWindow = window.toList().filter { !it.isEmpty() } 
    val nextWindow = nonEmptyWindow.flatMapPublisher { 
     window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io()) 
    } 
    return nonEmptyWindow.toFlowable().concatWith(nextWindow) 
} 

data class Data(val data: Int, val time: Double) 

Der obige Test ergibt
[[5, 10, 30], [10, 30, 40, 60], [30, 40, 60, 90], [40, 60, 90], [90]]

+0

Dank es hilft bei der Erreichung der gewünschten Lösung in meinem Fall – Sagar