2016-10-21 4 views
2

Ich habe Probleme mit der Ausführung einiger Logik, wenn ein Abonnement abgemeldet wurde. Ich bin seit Stunden dabei und habe bisher wenig Fortschritte gemacht. Dies ist eine vereinfachte Version meiner Code:Löschen von Ressourcen beim Abmelden

public class Command<E> { 

    public CommandActionObservable execute() { 
     final CommandAction<E> command = createCommand(); 

     final OnSubscribe<CommandAction<E>> onSubscribe = (subscriber) -> { 

      /* Create a listener that handles notifications and register it. 
      * The idea here is to push the command downstream so it can be re-executed 
      */ 
      final Listener listener = (event) -> { 
       subscriber.onNext(command); 
      } 
      registerListener(listener); 

      /* This is where I'm having trouble. The unregister method 
      * should be executed when the subscriber unsubscribed, 
      * but it never happens 
      */ 
      subscriber.add(Subscriptions.create(() -> { 
       unregisterListener(listener); 
      })); 

      // pass the initial command downstream 
      subscriber.onNext(command); 

      kickOffBackgroundAction();    
     } 

     final Observable<CommandAction<E>> actionObservable = Observable.create(onSubscribe) 
      .onBackpressureLatest() 
      .observeOn(Shedulers.io()) 
      .onBackpressureLatest(); 
     return new CommandActionObservable((subscriber) -> { 
      actionObservable.unsafeSubscribe(subscriber); 
     }) 
    } 

    public class CommandActionObservable extends Observable<CommandAction<E> { 

     // default constructor omitted 

     public Observable<E> toResult() { 
      return lift((Operator) (subscriber) -> { 
       return new Subscriber<CommandAction<E>>() { 
        // delegate onCompleted and onError to subscriber 

        public void onNext(CommandAction<E> action) { 
         // execute the action and pass the result downstream 
         final E result = action.execute(); 
         subscriber.onNext(result) 
        } 
       } 
      } 
     } 

    } 

} 

Ich bin der Command in der üblichen Weise verwendet wird, Zugabe der erhaltenen Abonnement für eine CompositeSubscription und von ihm in onDestroy() abzumelden. Hier ein Beispiel:

final Observable<SomeType> obs = new Command<SomeType>() 
             .execute() 
             .toResult(); 
subscription.add(obs.subscribe(// impl here)); 



public void onDestroy() { 
    super.onDestroy(); 
    subscription.unsubscribe(); 
} 

Wie bereits erwähnt, habe ich nicht die Abmelde Logik bekommen kann, zu arbeiten und die Hörer deregistrieren, die Speicherlecks in der App verursacht. Wenn ich doOnUnsubscribe() auf obs rufe, wird es aufgerufen, also werde ich korrekt subskribieren, aber vielleicht verursacht die Verschachtelung der Observablen und das Anheben einige Probleme.

Ich würde mich freuen, Meinungen zu diesem Thema zu leiten.

Antwort

1

Es stellte sich heraus, es war viel einfacher als ich erwartet hatte.

Nach ein bisschen Graben konnte ich mir die Antwort selbst einfallen lassen. Sende das einfach für Leute, die in der gleichen Situation enden können wie ich.

Also, wie ich in meiner Frage erwähnt, wenn ich eine doOnSubscribe() Aktion auf die beobachtbare wurde ich in meinem Activity bekommen, wird es benachrichtigt. Als nächstes habe ich versucht, die gleiche Aktion auf der inneren Observables Ich erstelle in der execute() Methode hinzufügen. Sie wurden nicht angerufen. So kam ich zu dem Schluss, dass die Kette irgendwo zwischen dem Observablen in meiner Aktivität und den Observablen, die ich in execute() erstellte, unterbrochen wurde.

Die einzige Sache, die dem Strom geschah, war die Anwendung meiner Gewohnheit Operator, die in toResult() implementiert wurde. Nach einer Google-Suche stieß ich auf diesen hervorragenden Artikel - Pitfalls of Operator Implementation. Ich habe tatsächlich die Kette in meinem Operator gebremst und die vorgelagerten Observablen wurden nicht über die Abmeldung informiert.

Nachdem ich getan habe, was der Autor rät, ist alles gut. Hier ist, was ich tun musste:

lift((Operator) (subscriber) -> { 
    // connect the upstream and downstream subscribers to keep the chain intact 
    new Subscriber<CommandAction<E>>(subscriber) { 
     // the implementation is the same 
    } 
} 
Verwandte Themen