Wir verwenden mehrere Dienste in unserer Android App. Diese Dienste stellen ihre Daten als unendlich Observables
dar, die oft durch Kombination der Observables
anderer Dienste erstellt werden. Die Konstruktion dieser Observables
kann teuer sein. Darüber hinaus werden die Dienste oft an mehreren Orten konsumiert, so dass ihre Observable
unter den Teilnehmern geteilt werden sollte.Wie man freigegebene, unendliche Observables mit einer Verzögerung nach dem letzten Teilnehmer abmelden
Beispiel:
LocationService
liefert eine unendlicheObservable<Location>
, dieReminderService
den aktuellen Standort sendet, liefert eine unendlicheObservable<List<Reminder>>
, welche die Liste aller gespeicherten Erinnerungen nach jeder Änderung im Datensatz aussendet,
LocationAwareReminderService
, bietet eine unendlicheObservable<List<Reminders>>
von in der Nähe Erinnerungen vonObservable.combineLatest
dieObservables
der beiden vorherigen Dienste
Erster Ansatz: interne BehaviorSubjects als Cache
Jeden Dienst kombiniert die verbrauchte Observables
und abonniert es BehaviorSubject
das resultierende Futter intern ist. Verbraucher können diese dann unter BehaviorSubject
abonnieren. Die LocationAwareReminderService
zum Beispiel:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).subscribe(cache);
feed = cache.asObservable();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Nachteil:
- wegen des Verhaltens unterliegt die Zuläufe der reminderService und die locatoinService nie teared nach unten. Auch wenn kein Verbraucher
- ist ist dies besonders problematisch, wenn sie auf Dienstleistungen abhängen, wie die LocationService, dass die Veröffentlichung neue Objekte hält häufig
- wegen der abonnieren (Cache) im Constructor der Dienst in der Nähe Erinnerungen zu calcuate beginnt sogar wenn kein Teilnehmer vorhanden
Vorteil:
- das resultierende Futter von allen Teilnehmern gemeinsam genutzt wird
- weil das Futter nie teared ist nach unten, kurze Zeiträume ohne einen Abonnenten kollabieren nicht die ganze Leitung
Zweite Methode: replay (1) .refCount().
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).replay(1).refCount();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Nachteil:
- kurze Zeiträume ohne
Subscriber
das ganze Rohr kollabieren. Während des nächsten Abonnements muss das gesamte Rohr rekonstruiert werden. - A Übergänge von A nach
Activity
Activity
B, die beide Zeichnung derLocationAwareReminderService.getFeed()
, führt zu einem vollständigen De- und Rekonstruktion des Rohres
Vorteil:
- nach dem letzten
Subscriber
unsubscribed, dieLocationAwareReminderService
wird auch von derLocationService.getFeed()
undreminderService.getFeed()
Observables
abmelden. - Die
LocationAwareReminderService
beginnt erst nearbyReminders zu liefern, nachdem die erstenSubscriber
- das resultierende Futter durch all
dritten Ansatz s Subscriber
geteilt wird gezeichnet: die refcount mit einem Timeout
Deshalb baue ich abmelden machen eine Transformer
, die das Abonnement für einen definierten Zeitraum nach dem letzten Subscriber
abbestellen hält
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {
private long keepAlive;
private TimeUnit timeUnit;
public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
this.keepAlive = keepAlive;
this.timeUnit = timeUnit;
}
@Override
public Observable<T> call(Observable<T> upstream) {
final Observable<T> sharedUpstream = upstream.replay(1).refCount();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed())
return;
// subscribe an empty Subscriber that keeps the subsription of refCount() alive
final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
// listen to unsubscribe from the subscriber
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// the subscriber unsubscribed
Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
@Override
public void call(Long _) {
// unsubscribe the keep alive subscription
keepAliveSubscription.unsubscribe();
}
});
}
}));
sharedUpstream.subscribe(subscriber);
}
});
}
public class NopSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(T o) {}
}
}
Die LocationAwareReminderService
den Vorteil RxPublishTimeoutCache
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Verwendung:
- Die
LocationAwareReminderService
beginnt erst nearbyReminders zu liefern, nachdem der ersteSubscriber
- das resultierende Futter von allen Teilnehmern gemeinsam genutzt wird abonniert
- kurze Zeiträume ohne einen Teilnehmer kollabieren nicht die ganze Leitung
- wird das gesamte Rohr teared nach unten, nachdem
Nachteil kein Abonnement für den festgelegten Zeitraum gab es:
- Vielleicht sind einige allgemeine Fehler?
Fragen:
- Gibt es bereits eine andere Möglichkeit, dies in RxJava zu erreichen?
- Gibt es einen allgemeinen Konstruktionsfehler in der
RxPublishTimeoutCache
? - Ist die Gesamtstrategie, solche Dienste mit RxJava zu komponieren, fehlerhaft?
Die dritte Option kündigt nach der Zeit die KeepAliveSubscription egal was. Im Idealfall sollte die keepAliveSubscription nur dann abgemeldet werden, wenn keine Abonnenten existieren, sodass eine Art der Abonnentenzählung gut sein könnte. Siehe meine Antwort. –
Danke für den Blick darauf. In Bezug auf die * Abmeldungen nach der Zeit die keepAliveSubscription egal was *, die 'keepAliveSubscription' kündigt nur den' replay (1) .refCount() 'Teil ab. Wenn weitere "Subscription" aktiv sind, wird der Upstream-Teil nicht abgemeldet, oder fehlt mir etwas? Ich denke, Ihr Anliegen war die mehrfache 'keepAliveSubscription', die von refCount verwaltet werden muss? –