Sie für die einen Single-Threaded-Scheduler arbeiten müssen und Sie können eine über Schedulers.from(Executor)
oder neue SingleScheduler
(Warnung: intern) erhalten.
Das Problem mit unsubscribeOn
ist, dass es nur ausgeführt wird, wenn der Downstream den Stream tatsächlich disponiert/löscht (im Gegensatz zu 1.x, wo dies fast immer passiert).
Ein besserer Weg ist using
mit dem oben genannten benutzerdefinierten Scheduler zu verwenden und manuell die Bereinigung planen:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
Aber beachten Sie, dass es sei denn, die create
Logik gibt den Scheduler (durch Beenden oder gehen async), die Annullierung Die Logik wird möglicherweise niemals aufgrund des Livelocks des gleichen Pools ausgeführt.
Haben Sie observerOper operator vor diesem setDisposable versucht? – paul
@paul Ich glaube, du hast meine Frage missverstanden. Ich bin nur daran interessiert, den Scheduler zwischen unsubscribeOn und subscribeOn zu koppeln. – zoltish