2012-05-03 6 views
5

Ich habe eine große Sammlung von einfachen Paar Klassen bekommt:RX verwenden, um Ereignisse zu unterschiedlichen Zeiten auszulösen?

public class Pair { public DateTime Timestamp; public double Value; } 

Sie durch den aufsteigenden Zeitstempel sortiert werden. Ich möchte ein Ereignis mit dem Wert (z. B. Aktion <Doppel>) für jedes Element in der Liste zur richtigen Zeit auslösen. Die Zeiten sind in der Vergangenheit, also muss ich die Zeitstempel so normalisieren, dass der erste in der Liste "jetzt" ist. Können wir das mit den Reactive Extensions so einrichten, dass es nach dem Zeitunterschied zwischen zwei Elementen das nächste Ereignis auslöst?

+0

Haben Sie sich http://reactiveproperty.codeplex.com/ angesehen? – dwerner

Antwort

6

Say pairs ist die Sequenz:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable(); 

Jetzt ist obs Paare als eine geordnete beobachtbar.

Observable.Zip(
    obs, 
    obs.Take(1).Concat(obs), 
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp) 
     .Select(_ => pair1.Value)) 
.Concat() 
.Subscribe(/* Do something here */); 

Der Zip sorgt dafür, dass die absoluten Zeiten in Offsets umgewandelt werden. Es wird die Sequenz aufnehmen und sie mit sich selbst, aber ausgeglichen durch eine, wie

Original 1--2--4--7--11 
Offset 1--1--2--4--7--11 
Joined 0--1--2--3--4 

Dieser neue Wert folgt dann in den Observable.Timer gesetzt wird es den entsprechenden Betrag zu verzögern. Das abschließende Concat flacht das Ergebnis von einem IObservable<IObservable<double>> in einen IObservable<double> ab. Dies setzt voraus, dass Ihre Sequenz bestellt ist.

+0

Schöne Lösung. Ich würde hinzufügen 'var orderedObs = pairs.OrderBy (p => p.Timestamp) .Observable()', um es offensichtlich zu machen, was passieren muss und stattdessen verwenden. Ich habe diese Änderungen vorgenommen. – yamen

+0

Das hilft sehr. Ich habe es verwendet, um historische Daten abzufragen und sie so abzuspielen, wie sie ursprünglich aufgenommen wurden. Ein Simulator, um zu beweisen, dass das neue System funktioniert. –

+0

Ich brauchte eine Weile, um herauszufinden, was genau los war, aber ich verstehe es jetzt. Rx ist ein Mindf ***. Tolle Lösung. +1 – BFree

0

Ich denke, dieses Problem ist interessant, das wäre mein erster Versuch.

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent) 
{ 
    if (pairs == null || !pairs.Any() || pairEvent == null) 
    return; 

    // if we can promise the pairs are already sorted 
    // obviously we don't need this next line 
    pairs = pairs.OrderBy(p => p.Timestamp); 
    var first = pairs .First().Timestamp; 
    var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p }); 

    var start = DateTime.Now; 

    double interval = 250; // 1/4 second 
    Timer timer = new Timer(interval); 

    timer.AutoReset = true; 
    timer.Elapsed += (sender, elapsedArgs) => 
    { 
    var signalTime = elapsedArgs.SignalTime; 
    var elapsedTime = (signalTime - start); 

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair); 
    wrapped = wrapped.Skip(pairsToTrigger.Count()); 

    if (!wrapped.Any()) 
     timer.Stop(); 

    foreach (var pair in pairsToTrigger) 
     pairEvent(pair.Value);  
    }; 

    timer.Start(); 
} 
+0

Dies ist wirklich unnötig kompliziert gegeben Rx hat Erweiterungen wie "Timer", "Defer" und "Delay". – yamen

+0

@yamen Ich habe Rx nie benutzt, noch plane ich es wirklich. Ich wollte als Herausforderung antworten, wie ich es von Grund auf neu machen sollte, weil ich dachte, es wäre interessant :) Entschuldigung, wenn meine Antwort in diesem Zusammenhang nur Spam ist. – payo

+2

Keine Notwendigkeit, sich zu entschuldigen, ich hoffe, Sie lernen etwas von den Rx-Lösungen oben. Ihre Antwort dient als Beispiel dafür, warum Rx groß ist :-) – yamen

2

Wenn durch „mit Rx“ Sie mir erlauben, nur die Rx-Scheduler zu verwenden, dann ist dies eine sehr einfache Lösung:

Action<double> action = 
    x => 
     Console.WriteLine(x); 

var ts0 = pairs.Select(p => p.Timestamp).Min(); 

pairs 
    .ForEach(p => 
     Scheduler 
      .ThreadPool 
      .Schedule(
       p.Timestamp.Subtract(ts0), 
       () => action(p.Value))); 

Dies verwendet die System.Interactive Erweiterung ForEach, aber man konnte nur verwenden eine reguläre foreach Schleife, um den Scheduler zu laden.

Ich habe den Code durch die folgenden Dummy-Daten getestet:

var pairs = new [] 
{ 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, }, 
}; 

Ich hoffe, das hilft.

+0

Hat der Scheduler eine eigene Warteschlange? Oder würde dieser Code den gesamten Threadpool zerkauen? Ich mache mir nur Sorgen um die Skalierbarkeit dieser Lösung. – Brannon

+0

@Brannon - Wenn ich mich richtig erinnere, verwenden Scheduler intern eine Heap-Sortierung, um die Aktionen in eine Warteschlange zu stellen.Außerdem wird ein Scheduler nur eine Aktion gleichzeitig ausführen und den aktuellen Thread wiederverwenden, wenn eine andere Aktion sofort bereit ist. Sie verwenden also immer nur einen Thread auf einmal. – Enigmativity

Verwandte Themen