2012-07-01 3 views
6

Ich habe eine Reihe von Ereignissen kommen und ich muss alle von ihnen ohne Verlust ausführen, aber ich möchte sicherstellen, dass sie zu den entsprechenden Zeitfenstern gepuffert und konsumiert werden. Jeder hat eine Lösung?Was ist der beste Weg, um einen Observable "Rate Limit" zu verbrauchen?

Ich kann keine Operatoren in Rx finden, die das ohne den Verlust der Ereignisse tun können (Throttle - verliert Ereignisse). Ich habe auch über Pufferung, Verzögerung usw. nachgedacht. Kann keine gute Lösung finden.

Ich habe versucht, einen Timer in der Mitte zu setzen, aber irgendwie funktioniert es nicht funktionieren:

GetInitSequence() 
      .IntervalThrottle(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) 
    { 
     return Observable.Create<T>(o => 
      { 
       return source.Subscribe(x => 
        { 
         new Timer(state => 
          o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); 
        }, o.OnError, o.OnCompleted); 
     }); 
    } 
+0

Können Sie eine Marmor hinzufügen digram zeigt was du hast und was du willst? Wie andere bin ich mir nicht sicher, was Sie erreichen möchten, da ich denke, dass Buffer einfach in Ordnung ist. –

+0

Was beschränkst du? – Fredrick

Antwort

10

Die Frage ist nicht 100% klar ist, so dass ich einige Vermutungen mache.

Observable.Delay ist nicht das, was Sie wollen, denn das wird eine Verzögerung von dem Zeitpunkt an, zu dem jedes Ereignis eintrifft, erstellen, anstatt nur Zeitintervalle für die Verarbeitung zu erstellen.

Observable.Buffer ist nicht das, was Sie wollen, weil das dazu führt, dass alle Ereignisse in jedem gegebenen Intervall an Sie weitergegeben werden und nicht einzeln nacheinander.

Also ich glaube, dass Sie nach einer Lösung suchen, die eine Art Metronom erzeugt, die abhebt und Ihnen ein Ereignis pro Zecke gibt. Dies kann naiver mit Observable.Interval für das Metronom konstruiert und Zip dafür Ihre Quelle zu verbinden:

var source = GetInitSequence(); 
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));  
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

Diese alle 5 Sekunden löst (im Beispiel oben), und geben Sie die ursprünglichen Elemente der Reihe nach.

Das einzige Problem mit dieser Lösung ist, dass, wenn Sie keine Quellenelemente für (sagen wir) 10 Sekunden haben, wenn die Quellenelemente ankommen, werden sie sofort ausgesendet, da einige der 'Trigger'-Ereignisse sitzen Dort warten auf sie. Marmordiagramm für dieses Szenario:

source: -a-b-c----------------------d-e-f-g 
trigger: ----o----o----o----o----o----o----o 
result: ----a----b----c-------------d-e-f-g 

Dies ist ein sehr vernünftiges Problem.Es gibt zwei Fragen hier schon, dass es angehen:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

Die Lösung wird auch eine Haupt Drain Erweiterungsmethode und sekundäre Buffered Erweiterung. Ich habe diese geändert, um viel einfacher zu sein (keine Notwendigkeit für Drain, verwenden Sie einfach Concat). Die Nutzung ist:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

Die Erweiterung Methode StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 
+0

Danke. Immer noch nicht wirklich, was ich suchte, aber es zeigte mir einige Ideen. Ich bin nur frustriert mit Rx - warum sollte es so komplex sein und keine ordnungsgemäße Dokumentation haben. Die Lernkurve ist steil und erfordert ein breites Wissen über das Thema, um etwas Wertvolles zu erreichen. #fail – IgorM

+1

Einverstanden. Aus diesem Grund habe ich viel Zeit damit verbracht, IntroToRx.com zu schreiben, um Menschen in Ihrer Position zu helfen. Es ist schwer und es gibt viel zu lernen. –

+0

Ich finde wirklich, dass diese Rx-Operatoren schwer zu lesen und zu begründen sind. Ich denke, es ist meine Einschränkung - wahrscheinlich, weil ich einen visuellen Verstand habe und ich das Ergebnis nicht visualisieren kann. Gibt es eine Chance, ein Marmordiagramm für das, was der Code in dieser Antwort tut, zu erhalten? –

0

Wie wäre es Observable.Buffer? Dies sollte alle Ereignisse im 1s-Fenster als einzelnes Ereignis zurückgeben.

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); 
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

Es könnte sein, was Sie fragen, ist nicht klar. Was soll dein Code tun? Es sieht so aus, als würden Sie nur zögern, indem Sie für jedes Ereignis einen Timer erstellen. Es bricht auch die Semantik des Observablen, da das Nächste und das Letzte vor dem nächsten auftreten können.

Beachten Sie, dass dies auch nur bei der verwendeten Zeitschaltuhr so ​​genau ist. Typischerweise sind die Timer auf maximal 16ms genau.

Edit:

Ihr Beispiel wird, und Artikel über alle Ereignisse im Fenster:

GetInitSequence() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 
1

Ich weiß, das ist einfach zu einfach sein könnte, aber würde das funktionieren?

var intervaled = source.Do(x => { Thread.Sleep(100); }); 

Im Grunde legt dies nur eine minimale Verzögerung zwischen den Werten. Zu einfach?

+0

Dies entspricht und behebt das Verhalten der OPs IntervalThrottle ist das wirklich sinnvoll? –

+1

Eeek ... blockierende Threads !? Diese Art von Flügen angesichts von Rx-Prinzipien richtig? –

+0

Ja, in seinem reinsten Sinn ist dies gegen Rx, aber die Anforderung ist zu blockieren –

1

Nach dem Vorbild der Enigmativity Antwort, wenn alles, was Sie tun wollen, nur Verzögerung von einem Timespan alle Werte ist, ich kann nicht sehen, warum Delay nicht der Betreiber wollen Sie

GetInitSequence() 
     .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here 
     .Subscribe(
      item => 
       { 
        Console.WriteLine(DateTime.Now); 
        // Process item 
       } 
     ); 
Verwandte Themen