2016-03-11 17 views
20

Ich habe eine Observable<<List<Foo>> getFoo(), die von einem Retrofit Service erstellt wird und nach dem Aufruf der .getFoo() Methode muss ich es mit mehreren Abonnenten teilen. Wenn die Methode .share() aufgerufen wird, wird der Netzwerkaufruf jedoch erneut ausgeführt. Replay Operator funktioniert auch nicht. Ich weiß, dass eine mögliche Lösung .cache() sein könnte, aber ich weiß nicht, warum dieses Verhalten verursacht wird.Single Observable mit mehreren Abonnenten

Retrofit retrofit = new Retrofit.Builder() 
      .baseUrl(API_URL) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build(); 

    // Create an instance of our GitHub API interface. 

    // Create a call instance for looking up Retrofit contributors. 

    Observable<List<Contributor>> testObservable = retrofit 
      .create(GitHub.class) 
      .contributors("square", "retrofit") 
      .share(); 


    Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable throwable) { 

     } 

     @Override 
     public void onNext(List<Contributor> contributors) { 
      System.out.println(contributors); 
     } 
    }); 

    Subscription subscription2 = testObservable 
      .subscribe(new Subscriber<List<Contributor>>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(List<Contributor> contributors) { 
        System.out.println(contributors + " -> 2"); 
       } 
      }); 

    subscription1.unsubscribe(); 
    subscription2.unsubscribe(); 

Der obige Code kann das oben genannte Verhalten reproduzieren. Sie können es debuggen und sehen, dass die empfangenen Listen zu einer anderen MemoryAddress gehören.

Ich habe auch ConnecableObservables als eine mögliche Lösung betrachtet, aber dies erfordert, dass ich das Original Observable herumtragen und jedes Mal, wenn ich einen neuen Abonnenten hinzufügen möchte, .connect() aufrufen.

Diese Art von Verhalten mit der .share() funktionierte gut bis Retrofit 1.9. Es hat aufgehört, an Retrofit 2 - beta zu arbeiten. Ich habe es noch nicht mit der Retrofit 2 Release Version getestet, die vor einigen Stunden veröffentlicht wurde.

EDIT: 01/02/2017

Für zukünftige Leser, ich habe einen Artikel geschrieben here mehr über den Fall zu erklären!

Antwort

23

Sie scheinen (implizit) Casting Ihre ConnectedObservable von .share() zurück in eine normale Observable. Vielleicht möchten Sie sich über den Unterschied zwischen heißen und kalten Observablen informieren.

Versuchen

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share(); 

Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable throwable) { 

    } 

    @Override 
    public void onNext(List<Contributor> contributors) { 
     System.out.println(contributors); 
    } 
}); 

Subscription subscription2 = testObservable 
     .subscribe(new Subscriber<List<Contributor>>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable throwable) { 

      } 

      @Override 
      public void onNext(List<Contributor> contributors) { 
       System.out.println(contributors + " -> 2"); 
      } 
     }); 

testObservable.connect(); 
subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

Edit: Sie müssen connect() nicht rufen jedes Mal, wenn ein neues Abonnement möchten Sie es brauchen die beobachtbare zu starten. Ich nehme an, Sie replay() sicherstellen nutzen könnten alle nachfolgenden Abonnenten alle Einzelteile erhalten produziert

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share() 
     .replay() 
+0

Vielen Dank für Ihre Antwort. Die Sache ist, dass ich wirklich vermeiden möchte, jedes Mal Verbindung anzurufen. Sind Sie sicher, dass der Replay-Operator mit diesem Anwendungsfall gut funktioniert? – Pavlos

+0

Eigentlich habe ich es getestet und es hat funktioniert. Vielen Dank für Ihre Zeit und Ihre Antwort. Nur um des Problems willen habe ich den Unterschied zwischen den heißen und kalten Observablen gelesen, konnte es aber nicht mit den Netzwerkaufrufen mit Retrofit reproduzieren. Wenn ich eine Observable.just() verwendet, funktioniert der Share-Operator ziemlich gut. – Pavlos

30

Nachdem wieder mit RxJava Entwickler Dávid Karnok Überprüfung Ich möchte eine vollständige Erklärung vorzuschlagen, was hier vorging.

share() ist definiert als publish().refCount(), i. e. Die Quelle Observable wird zuerst in eine ConnectableObservable von publish() umgewandelt, aber anstatt connect() "manuell" aufzurufen, wird dieser Teil von refCount() behandelt. Insbesondere ruft refCountconnect() auf dem ConnectableObservable, wenn es selbst das erste Abonnement erhält; Solange es mindestens einen Abonnenten gibt, bleibt er abonniert. und schließlich, wenn die Anzahl der Teilnehmer auf 0 fällt, wird sie sich nach oben abmelden. Mit kaltenObservables, wie die von Retrofit zurückgegeben werden, stoppt dies alle laufenden Berechnungen.

Wenn nach einem dieser Zyklen ein anderer Teilnehmer kommt, wird refCount erneut connect aufrufen und damit eine neue Subskription für die Quelle Observable auslösen. In diesem Fall wird eine weitere Netzwerkanforderung ausgelöst.

Nun wurde dies normalerweise nicht mit Retrofit 1 (und tatsächlich jede Version vor this commit) offensichtlich, weil diese älteren Versionen von Retrofit standardmäßig alle Netzwerkanforderungen in einen anderen Thread verschoben haben.Dies bedeutete normalerweise, dass alle Ihre subscribe() Anrufe passieren würden, während die erste Anfrage/Observable noch lief und daher würde die neue Subscriber s einfach zu refCount hinzugefügt werden und würde daher keine zusätzlichen Anfragen auslösen/Observables.

Neuere Versionen von Retrofit verschieben die Arbeit jedoch nicht standardmäßig in einen anderen Thread - Sie müssen dies explizit tun, indem Sie beispielsweise subscribeOn(Schedulers.io()) aufrufen. Wenn Sie nicht, wird alles auf dem aktuellen Thread bleiben, was bedeutet, dass die zweite subscribe() wird nur passieren, nachdem die erste Observable hat onCompleted aufgerufen und damit schließlich Subscribers haben sich abgemeldet und alles ist heruntergefahren. Jetzt, wie wir im ersten Absatz sahen, hat der zweite subscribe(), share() hat keine andere Wahl, als eine andere Subscription an die Quelle Observable zu verursachen und eine weitere Netzwerkanforderung auszulösen.

Um zu dem Verhalten zurückzukehren, das Sie von Retrofit 1 gewohnt sind, fügen Sie einfach subscribeOn(Schedulers.io()) hinzu.

Dies sollte dazu führen, dass nur die Netzwerkanforderung ausgeführt wird - die meiste Zeit. Im Prinzip könnten Sie jedoch mehrere Anfragen erhalten (und Sie könnten immer mit Retrofit 1), aber nur, wenn Ihre Netzwerk-Anfragen extrem schnell sind und/oder die subscribe() Anrufe mit beträchtlicher Verzögerung passieren, so dass wieder die erste Anfrage ist beendet, wenn der zweite subscribe() passiert.

Daher schlägt Dávid vor, entweder cache() zu verwenden (aber es hat die genannten Nachteile) oder replay().autoConnect(). Nach diesen release notes, autoConnect funktioniert wie nur die erste Hälfte der refCount, oder genauer gesagt, ist es

ähnlich im Verhalten refcount(), mit der Ausnahme, dass es nicht nicht trennen, wenn Abonnent verloren.

Dies bedeutet, dass die Anforderung würde nur dann ausgelöst werden, wenn die ersten subscribe() geschieht aber dann später Subscriber s würde alle emittierten Produkte erhalten, unabhängig davon, ob gab es, jederzeit zwischendurch, 0 Abonnenten.

+1

Vielen Dank für Ihre ausführliche Erklärung. Ich werde es mir merken, obwohl ich jetzt ziemlich sicher bin, dass ich ein paar bequeme Lösungen für dieses Problem habe :) – Pavlos

+2

Großartig! Ich wollte diese Erklärung nur hinzufügen, weil "replay", "share", "publish" usw. kompliziert genug sind und es nicht schadet, detaillierte Erklärungen zu Randfällen zu haben. –

+2

Großartige Antwort in der Tat :) –

Verwandte Themen