2017-12-25 3 views
1

Ich kann keine klare Erklärung finden, wie RxJava Subscriber s mehrere Subscription s behandeln. (Anmerkungen zu RxJava 1.x beziehen)Subskribieren/Abbestellen mehrerer Abonnements mit einem Abonnenten

Subscriber eine add(Subscription) Methode hat, die "fügt die Subscription auf die Subscriber‚s Liste der Subscription s", mit keiner weiteren Erläuterung. Welche Regeln gelten für die Teilnahme an Subscription?

Interessanterweise haben beide Subscriber und Subscription eine unsubscribe() Methode. Vermutlich wird subscriber.unsubscribe() die gesamte Subscription Liste abbestellen, während subscription.unsubscribe() nur diese eine Subscription abbestellen wird, obwohl ich das nirgendwo nirgendwo finden kann. Ist das richtig?

Ich bin noch nicht in RxJava 2.x getaucht; Gibt es Änderungen daran (außer Subscription wird umbenannt)?

Antwort

0

etwas Licht in diese zu bringen, und auch verstehen, besser den Teilnehmer und Abonnement Ich habe dieses Beispiel

/** 
* In every moment we have the possibility to create our own subscriber, which you have to implement ActionSubscriber 
* with onNext, onError and onComplete functions. 
* Once that you do that you can attach that subscriber into a subscription. 
* 
* You can also can add the subscription into a subscriptionList that a subscriber has to know the state of the 
* subscriptions where he is part of 
*/ 
@Test 
public void subscriberAndSubscription() { 
    Integer[] numbers = {0, 1, 2}; 

    Subscriber subscriber = new ActionSubscriber(number -> { 
     try { 
      Thread.sleep(500); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Subscriber number:" + number); 
    }, 
      System.out::println, 
      () -> System.out.println("Subscriber End of pipeline")); 

    Subscription subscription = Observable.from(numbers).subscribeOn(Schedulers.newThread()).subscribe(subscriber); 
    Subscription subscription1 = Observable.from(numbers).subscribeOn(Schedulers.newThread()).subscribe(subscriber); 

    subscriber.add(subscription); 
    subscriber.add(subscription1); 
    System.out.println("Is Unsubscribed??:" + subscriber.isUnsubscribed()); 


    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(5, TimeUnit.SECONDS); 

    System.out.println("Is Unsubscribed??:" + subscriber.isUnsubscribed()); 

} 

Als ich die subscriber.add(subscription); ist ein Mechanismus zur Steuerung der Anzahl der Abonnements, wo mein Abonnent verstehen Teil es und der Zustand dieser Abonnements. So kann ich wissen, wann alle meine Abonnements beendet sind, um mich zu emittieren und abzumelden.

Ich habe einige Beispiele hier https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java

0

Subscriber.add() wird intern von RxJava 1 zu assoziieren Ressourcen wie Scheduler.Worker s, andere Subscriber s und Aktionen verwendet, die unsubscribed/ausgeführt werden müssen, wenn die Subscriber unsubscribed ist oder abgeschlossen ist.

Subscriber.unsubscribe() ist da, weil Subscriber ist auch ein Subscription, die sie miteinander und zueinander oben erwähnt für die Bereinigungsroutine hinzugefügt verkettet werden können.

Intern hat Subscriber eine Liste von Subscription s, die zusammen abbestellt werden können.

Als Endverbraucher der RxJava API müssen Sie sich keine Gedanken über add() machen. Diese Ressource-Container-Logik der Öffentlichkeit zugänglich zu machen und damit alle Subscriber s damit zu belasten, war jedoch nicht die ideale Lösung. 2.x hat die Architektur in overcome this property geändert.

In 2.x übernimmt die Schnittstelle die Rolle der Schnittstelle Subscription, aber der Standardwert Observer hostet keine Ressourcen mehr. Es liegt in der Verantwortung des Upstream, dies zu tun. Um das alte Ressourcencontainerverhalten zu unterstützen, wurde jedoch eine separate abstrakte Klasse ResourceObserver eingeführt.

+0

Danke, das hilft. Nur um klar zu sein, ist es wahr (in RxJava 1.x): (1) 'subscription.unsubscribe()' annulliert nur das eine "Subscription", während (2) 'subscriber.unsubscribe()' alle Subscriptions abbricht 's in der' Abonnenten'-Liste? –

+0

Warum wissen Sie, dass "subscription.unsubscribe()" über andere Instanzen von "Subscription" weiß? – akarnokd

+0

Das ist nicht wirklich das Problem. Das Problem ist: Ist es möglich, ein einzelnes "Abonnement" eines "Abonnenten" abzubestellen, während der Rest aktiv bleibt? –

Verwandte Themen