2017-09-02 3 views
1

Wie in Reactive x (idealerweise mit Beispielen in RxJava oder RxJs) kann dies erreicht werden?unendlichen Strom endlicher Ströme in einen unendlichen Strom umwandeln - Reaktiv X

a |-a-------------------a-----------a-----------a---- 
s1 |-x-x-x-x-x-x -| (subscribe) 
s2      |-x-x-x-x-x-| (subscribe) 
s2            |-x-x-x-x-x-| (subscribe) 
... 
sn 
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe) 

a ist eine unendliche Strom von Ereignissen, die finite Strom auslösen sn der Ereignisse, von denen jeder einen Teil des unendlichen Stroms S während der Lage, zu jedem sn Strom zu abonnieren (um Summationsoperationen zu tun) sein sollte aber zumin die gleiche Zeit halten Strom S als unendlich.

EDIT: Um konkreter zu sein, biete ich die Umsetzung von dem, was ich in Kotlin suche. Alle 10 Sekunden wird ein Ereignis ausgegeben, das auf den gemeinsamen endlichen Strom von 4 Ereignissen abbildet. Der Metastream ist flatMap -ed in normalen unendlichen Strom. Ich verwende doAfterNext, um jeden endlichen Strom zusätzlich zu abonnieren und Ergebnisse auszugeben.

/** Creates a finite stream with events 
* $ch-1 - $ch-4 
*/ 
fun createFinite(ch: Char): Observable<String> = 
     Observable.interval(1, TimeUnit.SECONDS) 
       .take(4) 
       .map({ "$ch-$it" }).share() 

fun main(args: Array<String>) { 

    var ch = 'A' 

    Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
      .map { createFinite(ch++) } 
      .doAfterNext { 
       it 
         .count() 
         .subscribe({ c -> println("I am done. Total event count is $c") }) 
      } 
      .flatMap { it } 
      .subscribe { println("Just received [$it] from the infinite stream ") } 

    // Let main thread wait forever 
    CountDownLatch(1).await() 
} 

Allerdings bin ich mir nicht sicher, ob dies der 'reine RX' Weg ist.

+1

Das sieht nach 'concatMap' aus, aber es ist unklar, wie man jedes Ereignis auf eine Reihe von N inneren Quellen abbildet. – akarnokd

+1

Vielleicht fügen Sie ein Beispiel von dem, was Sie bisher versucht haben, das wird uns eine bessere Vorstellung davon geben, was Sie erreichen möchten. – paulpdaniels

+0

http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - Ich lese den Titel – inf

Antwort

0

Sie machen nicht klar, wie Sie das Zählen machen möchten. Wenn Sie eine Gesamtzählung tun, dann gibt es keine Notwendigkeit das Innere Abonnement zu tun:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it } 
     .doOnNext(counter.incrementAndget()) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

Auf der anderen Seite, wenn Sie einen Zähler für jede Zwischen beobachtbaren zur Verfügung stellen müssen, dann können Sie das Zählen bewegen innen die flatMap() und aus der Zählung ausdrucken und nach Abschluss zurück:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it 
        .doOnNext(counter.incrementAndget() 
        .doOnCompleted({ long ctr = counter.getAndSet(0) 
             println("I am done. Total event count is $ctr") 
            }) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

das ist nicht sehr funktionell, aber diese Art der Berichterstattung neigt normale Ströme zu brechen.

Verwandte Themen