2016-04-13 2 views
5

Wir verwenden mehrere Dienste in unserer Android App. Diese Dienste stellen ihre Daten als unendlich Observables dar, die oft durch Kombination der Observables anderer Dienste erstellt werden. Die Konstruktion dieser Observables kann teuer sein. Darüber hinaus werden die Dienste oft an mehreren Orten konsumiert, so dass ihre Observable unter den Teilnehmern geteilt werden sollte.Wie man freigegebene, unendliche Observables mit einer Verzögerung nach dem letzten Teilnehmer abmelden

Beispiel:

  • LocationService liefert eine unendliche Observable<Location>, die
  • ReminderService den aktuellen Standort sendet, liefert eine unendliche Observable<List<Reminder>>, welche die Liste aller gespeicherten Erinnerungen nach jeder Änderung
  • im Datensatz aussendet,
  • LocationAwareReminderService, bietet eine unendliche Observable<List<Reminders>> von in der Nähe Erinnerungen von Observable.combineLatest die Observables der beiden vorherigen Dienste

Erster Ansatz: interne BehaviorSubjects als Cache

Jeden Dienst kombiniert die verbrauchte Observables und abonniert es BehaviorSubject das resultierende Futter intern ist. Verbraucher können diese dann unter BehaviorSubject abonnieren. Die LocationAwareReminderService zum Beispiel:

public class LocationAwareReminderService { 

    Observable<List<Reminder>> feed; 

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { 
     BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create(); 
     Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { 
      @Override 
      public List<Reminder> call(List<Reminder> reminders, Location location) { 
       return calculateNearbyReminders(reminders, location); 
      } 
     }).subscribe(cache); 

     feed = cache.asObservable(); 
    } 

    public Observable<List<Reminder>> getFeed() { 
     return feed; 
    } 
} 

Nachteil:

  • wegen des Verhaltens unterliegt die Zuläufe der reminderService und die locatoinService nie teared nach unten. Auch wenn kein Verbraucher
  • ist ist dies besonders problematisch, wenn sie auf Dienstleistungen abhängen, wie die LocationService, dass die Veröffentlichung neue Objekte hält häufig
  • wegen der abonnieren (Cache) im Constructor der Dienst in der Nähe Erinnerungen zu calcuate beginnt sogar wenn kein Teilnehmer vorhanden

Vorteil:

  • das resultierende Futter von allen Teilnehmern gemeinsam genutzt wird
  • weil das Futter nie teared ist nach unten, kurze Zeiträume ohne einen Abonnenten kollabieren nicht die ganze Leitung

Zweite Methode: replay (1) .refCount().

public class LocationAwareReminderService { 

    Observable<List<Reminder>> feed; 

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { 
     feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { 
      @Override 
      public List<Reminder> call(List<Reminder> reminders, Location location) { 
       return calculateNearbyReminders(reminders, location); 
      } 
     }).replay(1).refCount(); 
    } 

    public Observable<List<Reminder>> getFeed() { 
     return feed; 
    } 
} 

Nachteil:

  • kurze Zeiträume ohne Subscriber das ganze Rohr kollabieren. Während des nächsten Abonnements muss das gesamte Rohr rekonstruiert werden.
  • A Übergänge von A nach ActivityActivity B, die beide Zeichnung der LocationAwareReminderService.getFeed(), führt zu einem vollständigen De- und Rekonstruktion des Rohres

Vorteil:

  • nach dem letzten Subscriber unsubscribed, die LocationAwareReminderService wird auch von der LocationService.getFeed() und reminderService.getFeed()Observables abmelden.
  • Die LocationAwareReminderService beginnt erst nearbyReminders zu liefern, nachdem die ersten Subscriber
  • das resultierende Futter durch all

dritten Ansatz s Subscriber geteilt wird gezeichnet: die refcount mit einem Timeout

Deshalb baue ich abmelden machen eine Transformer, die das Abonnement für einen definierten Zeitraum nach dem letzten Subscriber abbestellen hält

public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> { 

    private long keepAlive; 
    private TimeUnit timeUnit; 

    public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) { 
     this.keepAlive = keepAlive; 
     this.timeUnit = timeUnit; 
    } 

    @Override 
    public Observable<T> call(Observable<T> upstream) { 

     final Observable<T> sharedUpstream = upstream.replay(1).refCount(); 

     return Observable.create(new Observable.OnSubscribe<T>() { 
      @Override 
      public void call(Subscriber<? super T> subscriber) { 
       if (subscriber.isUnsubscribed()) 
        return; 
       // subscribe an empty Subscriber that keeps the subsription of refCount() alive 
       final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>()); 
       // listen to unsubscribe from the subscriber 
       subscriber.add(Subscriptions.create(new Action0() { 
        @Override 
        public void call() { 
         // the subscriber unsubscribed 
         Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() { 
          @Override 
          public void call(Long _) { 
           // unsubscribe the keep alive subscription 
           keepAliveSubscription.unsubscribe(); 
          } 
         }); 
        } 
       })); 
       sharedUpstream.subscribe(subscriber); 
      } 
     }); 
    } 

    public class NopSubscriber<T> extends Subscriber<T> { 
     @Override 
     public void onCompleted() {} 
     @Override 
     public void onError(Throwable e) {} 
     @Override 
     public void onNext(T o) {} 
    } 
} 

Die LocationAwareReminderService den Vorteil RxPublishTimeoutCache

public class LocationAwareReminderService { 

    Observable<List<Reminder>> feed; 

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { 
     feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { 
      @Override 
      public List<Reminder> call(List<Reminder> reminders, Location location) { 
       return calculateNearbyReminders(reminders, location); 
      } 
     }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS)); 
    } 

    public Observable<List<Reminder>> getFeed() { 
     return feed; 
    } 
} 

Verwendung:

  • Die LocationAwareReminderService beginnt erst nearbyReminders zu liefern, nachdem der erste Subscriber
  • das resultierende Futter von allen Teilnehmern gemeinsam genutzt wird abonniert
  • kurze Zeiträume ohne einen Teilnehmer kollabieren nicht die ganze Leitung
  • wird das gesamte Rohr teared nach unten, nachdem

Nachteil kein Abonnement für den festgelegten Zeitraum gab es:

  • Vielleicht sind einige allgemeine Fehler?

Fragen:

  • Gibt es bereits eine andere Möglichkeit, dies in RxJava zu erreichen?
  • Gibt es einen allgemeinen Konstruktionsfehler in der RxPublishTimeoutCache?
  • Ist die Gesamtstrategie, solche Dienste mit RxJava zu komponieren, fehlerhaft?
+0

Die dritte Option kündigt nach der Zeit die KeepAliveSubscription egal was. Im Idealfall sollte die keepAliveSubscription nur dann abgemeldet werden, wenn keine Abonnenten existieren, sodass eine Art der Abonnentenzählung gut sein könnte. Siehe meine Antwort. –

+0

Danke für den Blick darauf. In Bezug auf die * Abmeldungen nach der Zeit die keepAliveSubscription egal was *, die 'keepAliveSubscription' kündigt nur den' replay (1) .refCount() 'Teil ab. Wenn weitere "Subscription" aktiv sind, wird der Upstream-Teil nicht abgemeldet, oder fehlt mir etwas? Ich denke, Ihr Anliegen war die mehrfache 'keepAliveSubscription', die von refCount verwaltet werden muss? –

Antwort

1

Ich dachte, das ein interessantes Problem und schien, war ein nützlichen Bediener zu haben, so habe ich Transformers.delayFinalUnsubscribe in rxjava-extras:

observable 
    .publish() 
    .refCount() 
    .compose(Transformers 
     .delayFinalUnsubscribe(1, TimeUnit.MINUTES)); 

es verfügbar ist in rxjava-Extras von 0.7.9.1 auf Maven Central. Geben Sie eine Drehung, wenn Sie mögen und sehen Sie, ob es irgendwelche Probleme gibt.

+0

Danke für die Implementierung. Auf den ersten Blick ist der einzige Nachteil, den ich feststellen konnte, dass das 'delayFinalUnsubscribe' mehrere * Upstream * -Abonnements verursacht, wenn jemand vergisst,' publish(). RefCount() 'vorher zu verwenden. In einem solchen Fall würde das erste Abonnement aktiv bleiben, bis der letzte Teilnehmer sich abmeldet. Dies könnte zu einem wirklich merkwürdigen Verhalten führen. Bei richtiger Anwendung sollte das Problem jedoch gelöst werden. –

+0

Ja, das stimmt. Ohne 'refCount' voran, gibt es nicht viel Sinn darin, es zu benutzen, erwarte ich. Das Javadoc erklärt, was vor sich geht, hoffentlich wird niemand darüber verwirrt. Ich könnte dem Javadoc explizit Refcount hinzufügen. Übrigens gibt es ein offensichtliches Problem mit dem Operator in dem Fall, wenn jemand sich sofort vollständig für die Bereinigung abmelden möchte. API-weise wurde es hässlich, aber könnte gemacht werden. Wie lange beabsichtigen Sie, die endgültige Abmeldung in Ihrem Anwendungsfall zu verzögern? –

+0

Ich habe gerade überprüft. Das längste Zeitlimit, das wir derzeit verwenden, beträgt 1 Minute (nur für einen Dienst und wahrscheinlich werden wir es auf 30 Sekunden reduzieren). Aber am häufigsten seine 5,10 oder 30 Sekunden. Die Entscheidung basiert normalerweise darauf, wie häufig auf den Dienst zugegriffen wird und wie teuer das Setup ist. Bis jetzt brauchten wir keine Funktion um * abzubestellen * zu erzwingen, ohne auf die Zeitüberschreitung zu warten. Außerdem könnte dieser Transformer sehr hilfreich sein, um Orientierungsänderungen zu handhaben. Fürs Erste behalten wir den 'RxPublishTimeoutCache' bei, bis wir Zeit finden, zu rxjava-extras's delayFinalUnsubscribe zu migrieren –

Verwandte Themen