2

Ich habe eine beobachtbare von BlockCollection, dass ich wie eine Schlange zu ihmReactive beobachtbare Abonnement: Stop Abonnement und Abonnement erneuern

IObservable<ProcessHoldTransactionData> GetObservable() 
{  
    _queue.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default); 
} 

und abonnieren verwenden:

void StartSubscription() 
{ 
    _subscription = = GetObservable().Subscribe(
       data => OnNextSubscribe(data), 
       ex => _logger.Error("Error"), 
       () => _logger.Warn("Complete")); 
} 

jetzt habe ich eine andere beobachtbare:

var timer = Observable.Interval(TimeSpan.FromSeconds(60)); 
_subscriptionTimer = timer.Subscribe(tick => 
{ 
    OnTimerNextSubscribe(); 
}); 

Ich möchte, wenn die OnTimerNextSubscribe starten, um das Abonnement von _subscription zu stoppen und erneuern, wenn der OnTimerNextSubscribe beendet.

Was die beste paractice dazu?
Sollte ich entsorgen die _subscription und rufen StartSubscription()

Antwort

0

Es gibt grundsätzlich zwei Möglichkeiten: Man Verfügungs dann neu starten, das andere ist eine Art von Ein/Aus-Signal beobachtbar zu erstellen, dann filtern _subscription entsprechend :

void StartSubscription(Observable<bool> onOffSignal) 
{ 
    _subscription = = GetObservable() 
     .WithLatestFrom(onOffSignal, (s, b) => b ? Observable.Return(s) : Observable.Empty(s)) 
     .Merge() 
     .Subscribe(
       data => OnNextSubscribe(data), 
       ex => _logger.Error("Error"), 
       () => _logger.Warn("Complete") 
     ); 
}