2016-05-11 14 views
6

Ich frage mich, ob es eine Möglichkeit gibt, bestehende Operatoren zu komponieren, um das Gegenteil von switchMap() durchzuführen.RxJava - Gegenteil von switchMap() Operator?

Die switchMap() wird nach der letzten Emission jagen, die sie erhält, und alle Observable löschen, die sie zuvor ausgeführt hat. Nehmen wir an, ich habe es umgedreht, und ich möchte alle Emissionen ignorieren, die zu einem Operator kommen, während er mit der ersten empfangenen Emission beschäftigt ist. Es wird die Emissionen ignorieren, bis es den aktuellen Observable innerhalb davon ausstößt. Dann verarbeitet es die nächste Emission, die es empfängt.

Observable.interval(1, TimeUnit.SECONDS) 
     .doOnNext(i -> System.out.println("Source Emitted Value: " + i)) 
     .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
     .subscribe(i -> System.out.println("Subscriber received Value: " + i)); 

Gibt es eine Möglichkeit, dies zu erreichen? In dem obigen Beispiel, wenn intensiveProcess() waren drei Sekunden dauern, würde die ignoreWhileBusyMap()0 verarbeiten aber wahrscheinlich Emissionen 1 und 2 von interval() kommen ignorieren würde .Es dann 3 verarbeitet aber wahrscheinlich ignorieren 4 und 5, und so weiter ...

Antwort

5

sicher, Tor die Verarbeitung eines Wertes von einem boolean, die festgelegt wird, nachdem die Verarbeitung beendet:

AtomicBoolean gate = new AtomicBoolean(true); 

Observable.interval(200, TimeUnit.MILLISECONDS) 
.flatMap(v -> { 
    if (gate.get()) { 
     gate.set(false); 

     return Observable.just(v).delay(500, TimeUnit.MILLISECONDS) 
       .doAfterTerminate(() -> gate.set(true)); 
    } else { 
     return Observable.empty(); 
    } 
}) 
.take(10) 
.toBlocking() 
.subscribe(System.out::println, Throwable::printStackTrace); 

bearbeiten

Alternative:

Observable.interval(200, TimeUnit.MILLISECONDS) 
.onBackpressureDrop() 
.flatMap(v -> { 
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS); 
}, 1) 
.take(10) 
.toBlocking() 
.subscribe(System.out::println, Throwable::printStackTrace); 

Sie können onBackpressureDrop-onBackpressureLatest ändern sich mit dem letzten Wert weiterhin sofort.

+0

Awesome, ich habe etwas Ähnliches mit einem 'Semaphore' gemacht, aber ich hatte gehofft, eine rein reaktive Zusammensetzung mit existierenden Operatoren zu verwenden. Ich nehme an, ich könnte das alles in einem 'Transformer' verpacken. – tmn

+0

Verwenden Sie einen verzögerten Transformator, um zu vermeiden, dass das Gate über mehrere Endteilnehmer verteilt wird. – akarnokd

+0

Habe gerade gemerkt, dass deine Lösung nicht so blockhaft ist wie meine, also wechsle ich dazu. Vielen Dank! – tmn

0

Um Jeopardy Stil zu beantworten: Was ist concatMap?

concatMap wird zur ersten Observable abonnieren und werden nicht an nachfolgende Observable s, bis die vorherigen Observable Anrufe onComplete() abonnieren.

In dieser Hinsicht ist es das "Gegenteil" von switchMap, das eifrig von vorherigen Observable s abmeldet, wenn ein neues kommt.

concatMap will alles hören, dass jede Observable zu sagen hat, während switchMap ein sozialer Schmetterling ist, und bewegt sich, sobald eine andere beobachtbare verfügbar ist.

+0

Nicht ganz, wenn Sie die Frage genau lesen, werden Sie sehen, dass ich ein Verhalten verfolge, das wenig mit' concatMap' oder seinem wahrgenommenen Gegenteil zu tun hat.Um Ihrer Analogie zu folgen, suchte ich nach einem "xxxMap" -Operator, der sich darauf konzentrierte, mit dem ersten beobachtbaren Observablen zu sprechen und nachfolgenden Observablen zu sagen "nicht jetzt, ich bin beschäftigt, mit diesem Typen zu reden". Erst wenn sein Gespräch beendet ist, wird er einem anderen Observablen gestatten, sich mit ihm zu beschäftigen. – tmn

+0

Das ist, was 'concatMap' tut. – Andy

+0

Ich glaube 'concatMap()' ist das gleiche wie 'flatMap()', aber es verschachtelt nicht. Es garantiert, dass alle Emissionen letztendlich emittiert werden, auch wenn sie in die Warteschlange gestellt werden. Aber der Betreiber, nach dem ich gefragt habe, ignoriert einfach nachfolgende Emissionen, während er beschäftigt ist. – tmn