2016-04-23 7 views
7

Ich habe folgendes Problem:RxJava teilen sich eine Emissionen des beobachtbaren zwischen mehreren Teilnehmern

Ich habe einen beobachtbaren, dass einige Arbeit zu tun, aber auch andere Observablen brauchen die Ausgabe dieses beobachtbaren zu arbeiten. Ich habe versucht, mehrere Male auf dem gleichen Observablen zu subskribieren, aber innerhalb des Logs sehe ich, dass das ursprüngliche Observable mehrfach initiiert wird.

das ist meine beobachtbaren das ist Erstellen Sie das Objekt:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> { 
      if (mApi == null) { 
       //do some work 
      } 
      subscriber.onNext(mApi); 
      subscriber.unsubscribe(); 
     }) 

das ist meine beobachtbar, dass das Objekt

loadApi().flatMap(api -> api....())); 

Ich verwende

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread()) 
       .unsubscribeOn(Schedulers.io() 

auf allen Observablen muss.

Antwort

12

Ich bin mir nicht sicher, ob ich Ihre Frage richtig verstanden habe, aber ich denke, Sie suchen nach einer Möglichkeit, die Emissionen eines Observablen zwischen mehreren Teilnehmern zu teilen. Es gibt mehrere Möglichkeiten, dies zu tun. Zum einen könnte man ein Connectable Observable wie so verwenden:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish(); 
obs.subscribe(item -> System.out.println("Sub A: " + item)); 
obs.subscribe(item -> System.out.println("Sub B: " + item)); 
obs.connect(); //Now the source observable starts emitting items 

Ausgang:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Alternativ können Sie verwenden, um eine PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject 
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject 
subject.subscribe(item -> System.out.println("Sub B: " + item));  
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable 

Ausgang:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Beide o f Diese Beispiele sind single threaded, aber Sie können die Aufrufe von observeOn oder subscirbeOn einfach hinzufügen, um sie asynchron zu machen.

+0

Ja, ich möchte das erste Ergebnis aus dem beobachtbaren auf jeder – H3x0n

+0

abonnieren bekommen können was Ich sollte tun, wenn ich nicht weiß, wie oft mein Observable abonniert wird und wann der Abonnent das Ergebnis erhalten soll, bevor alle abonniert sind. – H3x0n

+0

This ist so ziemlich beantwortet in [diese Frage] (http://stackoverflow.com/questions/35951942/single-observable-with-multiple-subscribers) – JohnWowUs

1

Zunächst einmal mit Observable.create ist schwierig und leicht zu falsch. Sie brauchen so etwas wie

Observable.create(subscriber -> { 
     if (mApi == null) { 
      //do some work 
     } 
     if (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(mApi); 
      subscriber.onCompleted(); 
      // Not subscriber.unsubscribe(); 
     }    
}) 

Sie

ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect(); 

alle nachfolgenden Teilnehmer sollten die einzelnen ausgesandten Artikel

obs.subscribe(item -> System.out.println("Sub 1 " + item)); 
obs.subscribe(item -> System.out.println("Sub 2 " + item)); 
obs.subscribe(item -> System.out.println("Sub 3 " + item)); 
obs.subscribe(item -> System.out.println("Sub 4 " + item)); 
+0

Ich habe versucht, deins, aber die auf abonnieren wird nicht aufgerufen. Mein beobachtbares ist öffentliches statisches Finale. – H3x0n

+0

Haben Sie Ihre ursprüngliche Observable mit der .publish(). Replay() versucht. Autoconnect()? – JohnWowUs

+0

wenn ich nur .share() es funktioniert. – H3x0n

Verwandte Themen