2016-05-18 8 views
0

Ich bin den folgenden Code aus hier mit - sieht aus wie ein Problem zu mir in die "Replay-Cache" ClearingRx Cache - Replay Operator löscht

https://gist.github.com/leeoades/4115023

Wenn ich ändere den folgenden Aufruf und Code So sehe ich, dass es einen Bug in Replay gibt, dh es wird nie gelöscht. Kann jemand bitte helfen, dies zu korrigieren?

private Cache<string> GetCalculator() 
    { 
     var calculation = Observable.Create<string>(o => 
     { 
      _calculationStartedCount++; 

      return Observable.Timer(_calculationDuration, _testScheduler) 
          .Select(_ => "Hello World!" + _calculationStartedCount) // suffixed the string with count to test the behaviour of Replay clearing 
          .Subscribe(o); 
     }); 

     return new Cache<string>(calculation); 
    } 

[Test] 
    public void After_Calling_GetResult_Calling_ClearResult_and_GetResult_should_perform_calculation_again() 
    { 
     // ARRANGE 
     var calculator = GetCalculator(); 

     calculator.GetValue().Subscribe(); 
     _testScheduler.Start(); 

     // ACT 
     calculator.Clear(); 

     string result = null; 
     calculator.GetValue().Subscribe(r => result = r); 
     _testScheduler.Start(); 

     // ASSERT 
     Assert.That(_calculationStartedCount, Is.EqualTo(2)); 
     Assert.That(result, Is.EqualTo("Hello World!2")); // always returns Hello World!1 and not Hello World!2 
     Assert.IsNotNull(result); 
    } 

Antwort

2

Das Problem ist ein subtiles. Die Quellsequenz Timer wird abgeschlossen, nachdem sie ein Ereignis ausgelöst hat, das wiederum OnCompleted auf dem internen ReplaySubject erstellt von Replay aufruft. Wenn ein Subject abgeschlossen ist, akzeptiert es keine neuen Werte mehr, auch wenn ein neuer Observable angezeigt wird.

Wenn Sie auf die darunter liegenden Observable resubscribe es wieder ausführt, ist aber die Subject nicht in der Lage neu zu starten, so dass Ihr neuen Observer nur den letzten Wert vor dem Abschluss der ReplaySubject empfangen kann.

Die einfachste Lösung wäre wahrscheinlich nur nie der Quellenstrom vollständig (ungetestet) lassen:

public Cache(IObservable<T> source) 
    { 
     //Not sure why you are wrapping this in an Observable.create 
     _source = source.Concat(Observable.Never()) 
          .Replay(1, Scheduler.Immediate); 
    } 
Verwandte Themen