2013-02-11 8 views
5

Wie kann man in RX eine einfache, stateful Transformation einer Sequenz machen?RX: Stateful Transformation der Sequenz, z.B. exponentieller gleitender Durchschnitt

Angenommen, wir wollen eine exponentielle gleitende Durchschnittsumwandlung einer IObservable-NoiseSequence machen.

Wenn noisySequence Zecken, emaSequence sollte den Wert (previousEmaSequenceValue * (1-Lambda) + latestNoisySequenceValue * lambda) kreuzen und das Rück

Ich denke, wir Themen verwenden, aber wie genau?

public static void Main() 
    { 

     var rand = new Random(); 

     IObservable<double> sequence = Observable 
      .Interval(TimeSpan.FromMilliseconds(1000)) 
      .Select(value => value + rand.NextDouble()); 

     Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5); 

     IObservable<double> noisySequence = sequence.Select(addNoise); 

     Subject<double> exponentialMovingAverage = new Subject<double>(); // ??? 


     sequence.Subscribe(value => Console.WriteLine("original sequence "+value)); 
     noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value)); 
     exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value)); 

     Console.ReadLine(); 
    } 
+1

Um zu verdeutlichen, ich bin weniger interessiert an einer bestimmten Methode, um eine durchschnittliche, aber eher generische Möglichkeiten, um Stateful Transformationen zu machen. – Sputnik2513

Antwort

3

Für viele dieser Arten von Berechnungen, Buffer ist der einfachste Weg

var movingAverage = noisySequence.Buffer(/*last*/ 3, /*move forward/* 1 /*at a time*/) 
    .Select(x => (x[0] + x[1] + x[2])/3.0); 

Wenn Sie Zustand tragen müssen, verwenden Sie den Scan Operator, der wie Aggregate außer, dass sie alle Werte ergibt Iteration.

+0

Das funktioniert in diesem speziellen Beispiel eines Durchschnitts. Allgemeiner, wenn wir interne Zustände brauchen, wie man es am besten in RX macht? – Sputnik2513

+0

Auch das hat keine Zustände. Um einen exponentiellen gleitenden Durchschnitt zu implementieren, müssen Sie den letzten Wert des EMA kennen. – Sputnik2513

+0

Wie Paul sagte, können Sie Scan verwenden, mit dem Sie aggregieren können, indem Sie mit einem Seed- "Status" beginnen und für jeden OnNext eine Aktion für diesen Status ausführen. Sie können dann aus dem Status den Wert auswählen, den Sie außerhalb der Sequenz veröffentlichen möchten. Wenn Sie jedoch Zugriff auf mehr als nur das aktuelle Aggregat und den neuesten Wert benötigen, können die Fensteroperatoren nützlich sein. –

7

So können Sie den Status einer Sequenz zuordnen. In diesem Fall berechnet es den Durchschnitt der letzten 10 Werte.

var movingAvg = noisySequence.Scan(new List<double>(), 
(buffer, value)=> 
{ 
    buffer.Add(value); 
    if(buffer.Count>MaxSize) 
    { 
     buffer.RemoveAt(0); 
    } 
    return buffer; 
}).Select(buffer=>buffer.Average()); 

Aber Sie könnten Fenster verwenden (welcher Puffer eine Art Verallgemeinerung ist), um Ihren Durchschnitt auch zu erhalten.

noisySequence.Window(10) 
    .Select(window=>window.Average()) 
    .SelectMany(averageSequence=>averageSequence); 
+0

Tangential: Ihre Artikel über 'Window',' Buffer', 'Join' und' GroupJoin' waren sehr aufschlussreich - in der Tat glaube ich, dass ich Sie einige Male zitiert habe, um Rx-bezogene Fragen zu beantworten. :) – JerKimball

1

Vielen Dank! Hier ist eine Lösung mit Scan

const double lambda = 0.99; 
    IObservable<double> emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) => 
     { 
      if (Double.IsNaN(emaValue)) 
      { 
       emaValue = value; 
      } 
      else 
      { 
       emaValue = emaValue*lambda + value*(1-lambda); 
      } 
      return emaValue; 
     }).Select(emaValue => emaValue);