2016-06-29 9 views
0

Ich habe eine Observable erstellt, die die ViewPager's positionOffset emittiert.Wie teilt man eine erstellte Observable?

public Observable<Float> observable() { 

    return Observable.create(new Observable.OnSubscribe<Float>() { 

     @Override 
     public void call(final Subscriber<? super Float> subscriber) { 

      if (viewPager == null) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(new IllegalStateException("viewPager is null")); 
       } 
       return; 
      } 

      final ViewPager.OnPageChangeListener listener = new ViewPager.OnPageChangeListener() { 

       @Override 
       public void onPageScrolled(final int position, 
              final float positionOffset, 
              final int positionOffsetPixels) { 

        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(positionOffset); 
        } 
       } 

       @Override 
       public void onPageSelected(final int position) { 

       } 

       @Override 
       public void onPageScrollStateChanged(final int state) { 

       } 
      }; 
      viewPager.addOnPageChangeListener(listener); 

      subscriber.add(Subscriptions.create(() -> viewPager.removeOnPageChangeListener(listener))); 
     } 
    }); 
} 

Es funktioniert so weit.

Aber mit jedem Abonnement erstellt es ein neues ViewPager.OnPageChangeListener und fügt es dem ViewPager hinzu.

Gibt es eine Möglichkeit, dass alle Abonnements die gleiche Observable teilen, dass der ViewPager nur einen Listener hat?

Antwort

0

Sie können Observable.share() verwenden, das ist eine gute Option, wenn Sie mehrere Abonnenten haben und nichts dagegen haben, wenn späte Abonnenten einige Benachrichtigungen verlieren können.

Wenn Sie möchten, dass alle Abonnenten dieselben Benachrichtigungen sehen, verwenden Sie Observable.publish() - es wird ConnectableObservable zurückgegeben, die beginnt, Elemente zu senden, sobald ihre connect()-Methode aufgerufen wird. So können Sie tun

connectable = originalObservable.publish(); 
connectable.subscribe(observer1); 
connectable.subscribe(observer2); 
connectable.connect(); 
+0

'Observable.share()' sieht gut aus :-) –

0

Sie können Connecable Observables verwenden. Zum Beispiel unter der Code unendlich (nie abgeschlossen) Strom von Ereignissen simuliert:

Observable<Float> o = Observable.concat(Observable.just(1f,2f,3f), Observable.never()); 

Sie einen zuschaltbaren beobachtbaren mit Replay mit refcount erstellen können, die den letzten Wert zwischengespeichert werden:

Observable<Float> shared = o.replay(1).refCount(); 

Ein erster Teilnehmer wird der beobachtbare „heiß“ machen und wird es Elemente zu produzieren zwingen:

shared.subscribe(new Action1<Float>() { 

    @Override 
    public void call(Float t) { 
    System.out.println("o1:" + t); 

    } 

}); 
Thread.sleep(1000); 

eine zweite beobachtbare nach einer Weile mit ihm verbinden kann, und wird aus dem aktuellen Punkt beginnen emittiert:

shared.subscribe(new Action1<Float>() { 

    @Override 
    public void call(Float t) { 
    System.out.println("o2:" + t); 

    } 

}); 

Thread.sleep(1000); 

Ausgang:

o1:1.0 
o1:2.0 
o1:3.0 
o2:3.0 
Verwandte Themen