2017-04-15 2 views
0

Ich versuche, Beispielcode aus dem Abschnitt "Trennen" here zu replizieren.RxJava - ConnectableObservable, Trennen und erneutes Verbinden

Abklemmen

Wie wir in connect Unterschrift gesehen, gibt diese Methode ein Abonnement, wie Observable.subscribe tut. Sie können diese Referenz verwenden, um das Abonnement von ConnectableObservable zu beenden. Dadurch werden Ereignisse nicht mehr an Beobachter weitergegeben, sie werden jedoch nicht von ConnectableObservable abgemeldet. Wenn Sie connect erneut aufrufen, startet ConnectableObservable ein neues Abonnement, und die alten Beobachter erhalten wieder Werte.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
Subscription s = connectable.connect(); 

connectable.subscribe(i -> System.out.println(i)); 

Thread.sleep(1000); 
System.out.println("Closing connection"); 
s.unsubscribe(); 

Thread.sleep(1000); 
System.out.println("Reconnecting"); 
s = connectable.connect(); 

Ausgabe

0 
1 
2 
3 
4 
Closing connection 
Reconnecting 
0 
1 
2 
... 

Mit RxJava 2.0.8, die ich habe:

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
    Disposable s = connectable.connect(); 

    connectable.subscribe(new Observer<Long>() { 
     @Override 
     public void onSubscribe(Disposable d) { 

     } 

     @Override 
     public void onNext(Long aLong) { 
      Log.d("test", "Num: " + aLong); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Closing connection"); 
    s.dispose(); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Reconnecting..."); 
    connectable.connect(); 

Ausgabe

Num: 0 
Num: 1 
Num: 2 
Num: 3 
Num: 4 
Closing connection 
Reconnecting... 

Vielen Dank im Voraus ....

+0

Ich bin nicht zu verstehen, Ihr Problem – Cochi

+0

@Cochi in meinem Code, mein Abonnent erhält keine Werte, nachdem die verbindbare Quelle getrennt wurde, dann wieder verbunden. – veritas1

Antwort

4

Es scheint, dass dieses Verhalten nicht von RxJava übernommen wurde. Das Arbeitsbeispiel stammt von Rx.NET. Siehe https://github.com/ReactiveX/RxJava/issues/4771

+0

Wahrscheinlich ja. Das einzige Mal, wenn ich es schaffen kann, ist es, wenn ich auch 'subscribe()' erneut annehme, bevor 'connect() 'erneut aufgerufen wird. Soll es so funktionieren, wenn 'dispose()' einmal aufgerufen wird? –

Verwandte Themen