2016-11-09 7 views
0

Ich habe eine Observable<String>, die etwas Arbeit leistet. Nach seinem getan, schließt er seine Verbindungen (via .setDisposable(Disposables.fromAction(logic*);. Das Problem ist, ich brauche diese enge Aktion auf dem gleichen Scheduler wie die tatsächliche Arbeitsbelastung durchgeführt werden.RxJava 2: Immer abmelden auf dem .subscribeOn (..) Scheduler?

Ist es möglich? Wie?

Ich mag nicht die beobachtbare zu zwingen, an einem bestimmten Scheduler auszuführen, zBmyObservable.subscribeOn(x).unsubscribeOn(x);;

+0

Haben Sie observerOper operator vor diesem setDisposable versucht? – paul

+0

@paul Ich glaube, du hast meine Frage missverstanden. Ich bin nur daran interessiert, den Scheduler zwischen unsubscribeOn und subscribeOn zu koppeln. – zoltish

Antwort

1

Sie für die einen Single-Threaded-Scheduler arbeiten müssen und Sie können eine über Schedulers.from(Executor) oder neue SingleScheduler (Warnung: intern) erhalten.

Das Problem mit unsubscribeOn ist, dass es nur ausgeführt wird, wenn der Downstream den Stream tatsächlich disponiert/löscht (im Gegensatz zu 1.x, wo dies fast immer passiert).

Ein besserer Weg ist using mit dem oben genannten benutzerdefinierten Scheduler zu verwenden und manuell die Bereinigung planen:

Observable<T> createObservable(Scheduler scheduler) { 
    return Observable.create(s -> { 
     Resource res = ... 

     s.setCancellable(() -> 
      scheduler.scheduleDirect(() -> 
       res.close() // may need try-catch here 
      ) 
     ); 

     s.onNext(...); 
     s.onComplete(); 
    }).subscribeOn(scheduler); 
} 

Scheduler scheduler = new SingleScheduler(); 

createObservable(scheduler) 
.map(...) 
.filter(...) 
.subscribe(...); 

Aber beachten Sie, dass es sei denn, die create Logik gibt den Scheduler (durch Beenden oder gehen async), die Annullierung Die Logik wird möglicherweise niemals aufgrund des Livelocks des gleichen Pools ausgeführt.

+0

Leider habe ich keinen Zugriff auf den Scheduler - es ist nur in der "Eltern" beobachtbar: z. Observable.combineLatest (obs1, obs2) .subscribeOn (...) wobei obs1 oder obs2 in Ihrer Antwort beobachtbar wäre. – zoltish

+0

Dann müssen Sie Ihren Code einschließlich der Eltern neu gestalten. – akarnokd

+0

@akarnokd Ich habe den gleichen Pool-Livelock, den du erwähnt hast: mein Code ist blockiert auf einer Sync 'input.readLine()' daher kann ich den Thread nicht freigeben, so dass ich ihn verwenden kann, um den kündbaren aufzurufen. Irgendwelche Gedanken? – ESala

Verwandte Themen