2015-03-04 6 views
21

Hier ist ein Bild von dem, was ich versuche zu erreichen.Split Rx Observable in mehrere Ströme und einzeln verarbeiten

--abca - bbb - ein

Spaltung in

--a ----- ------- a a -> ein Strom

- --- b ------ bbb --- -> b Strom

------ c ---------- -> c Strom

Dann , kann

a.subscribe() 
b.subscribe() 
c.subscribe() 
sein

Bis jetzt hat alles, was ich gefunden habe, den Stream mit einer groupBy() geteilt, aber dann alles in einen einzigen Stream zurückgefaltet und alle in der gleichen Funktion verarbeitet. Ich möchte jeden abgeleiteten Stream anders verarbeiten.

Die Art, wie ich es gerade mache, macht eine Reihe von Filtern. Gibt es einen besseren Weg, dies zu tun?

Antwort

7

Sie müssen Observables von groupBy nicht zusammenbrechen. Sie können sie stattdessen abonnieren.

Etwas wie folgt aus:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"}; 

    Action1<String> a = s -> System.out.print("-a-"); 

    Action1<String> b = s -> System.out.print("-b-"); 

    Action1<String> c = s -> System.out.print("-c-"); 

    Observable 
      .from(inputs) 
      .groupBy(s -> s) 
      .subscribe((g) -> { 
       if ("a".equals(g.getKey())) { 
        g.subscribe(a); 
       } 

       if ("b".equals(g.getKey())) { 
        g.subscribe(b); 
       } 

       if ("c".equals(g.getKey())) { 
        g.subscribe(c); 
       } 
      }); 

Wenn Aussagen irgendwie hässlich aussehen, aber zumindest kann man jeden Strom separat behandeln. Vielleicht gibt es eine Möglichkeit, sie zu vermeiden.

+0

Ja, ich möchte vermeiden, dass diese wenns möglicherweise. Wenn es jedoch funktioniert, wird es etwas sauberer aussehen, da alles an einem Ort statt Filter für den ursprünglichen Stream. Vielen Dank! –

+0

Arbeitete wie ein Charme! –

+0

Cool! Ich werde meine Antwort aktualisieren, wenn ich herausfinde, wie ich "if" -Aussagen los werde. – ihuk

31

Leicht, benutzen Sie einfach filter

Ein Beispiel in scala

import rx.lang.scala.Observable 

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") 
val hotO: Observable[String] = o.share 
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") 
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") 
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c") 

aSource.subscribe(o ⇒ println("A: " + o), println,() ⇒ println("A Completed")) 

bSource.subscribe(o ⇒ println("B: " + o), println,() ⇒ println("B Completed")) 

cSource.subscribe(o ⇒ println("C: " + o), println,() ⇒ println("C Completed")) 

Sie müssen nur sicherstellen, dass die Quelle beobachtbar ist heiß. Der einfachste Weg ist share es.

+2

Was ist, wenn Sie wollen oder anfänglich beobachtbar kalt sein? –

+7

@double_squeeze verwenden Sie einfach 'publish' anstelle von' share' und rufen Sie 'connect' auf, wenn alle subsribers abonniert sind. –

+0

Upvoted wegen Kommentar von @Krzysztof Skyrzynecki – CrazyBS

Verwandte Themen