2017-09-21 5 views
0

Ich habe etwas von einer besonderen Bedarf/Anforderung für einen beobachtbaren Strom, der ein wenig über die normale Drosselung geht, und ich bin nicht ganz sicher, wie es geht:‚Sliding‘ RX.net .Throttle() Fenster

Grundsätzlich ich habe einen beobachtbaren Strom ursprünglich von einem normalen Ereignisse wie folgt:

var someEventObservable = Observable.FromEventPattern<SomeEventHandler, SomeEventArgs>(
    handler => this.ColumnWidthChanged += handler, 
    handler => this.ColumnWidthChanged -= handler) 
    .Select(_ => Unit.Default); 

Nun, da diese Ereignisse in schnellen Folge passieren kann, und ich muß nur wissen, ob es mindestens einmal innerhalb eines vorgegebenen Zeitrahmens geschehen, würde ich normalerweise verwenden .Throttle(), dh so:

var someThrottledEventObservable = someEventObservable 
    .Throttle(TimeSpan.FromMilliseconds(300)); 

Aber meine tatsächlichen Anforderungen gehen einen Schritt weiter: Wenn ein Ereignis innerhalb dieser Throttling TimeSpan/dueTime ausgelöst UND wenn noch ein anderes Ereignis nach dem ersten Ereignis ausgelöst wird, aber immer noch innerhalb dieser DueTime, möchte ich den gedrosselten Stream von 0 neu beginnen warten Sie noch einmal und warten Sie weitere 300 ms ... und wenn ein anderes Ereignis ausgelöst wurde, starten/verlängern Sie die Zeit erneut ... und so weiter und so weiter. Nur wenn kein anderes Ereignis innerhalb der ursprünglichen oder neu gestarteten TimeSpan (s)/dueTime ausgelöst wurde, sollte die someThrottledEventObservable eine neue Unit-Instanz ergeben.

Ich hoffe, dass das Sinn macht - aber im Grunde möchte/brauche ich einen gedrosselten Strom von Ereignissen, der ein Ereignis immer dann liefert, wenn der Quellstrom für eine bestimmte Zeit keine neuen Ereignisse mehr liefert & wenn neue Ereignisse innerhalb dieser Wartezeit den gedrosselten Stream passieren sollte wieder anfangen zu warten.

Oder: In einem andauernden "Sturm" von Ereignissen .Throttle() alleine ergibt alle 300ms eine neue Einheit (im obigen Beispiel), aber ich möchte genau eine neue Einheit, wenn ein oder mehrere Ereignisse ausgelöst wurden, aber keine neuen innerhalb einer 300ms Abkühlphase danach aufgetreten.

Wie würde ich das tun?

+1

Nicht vertraut mit rx.net Semantik, aber es klingt wie Gas verhält wie [Probe] (http://reactivex.io/documentation/operators/sample.html) statt [debounce] (http: //reactivex.io/documentation/operators/debounce.html), von dem reactivex.io behauptet, es handle sich um eine Operation. Debounce _should_ funktioniert wie beschrieben (basierend auf der Dokumentation), und eine faire Umformulierung Ihrer Frage könnte lauten "wie entprelle ich in Reactive Extensions für .NET". – maxwellb

+1

Ich denke, Throttle macht schon, was Sie erwarten. Es gibt nur das letzte Ereignis aus, wenn nach dem festgelegten Zeitraum kein anderes Ereignis eintritt. – nikoniko

+0

@nikoniko yeah - Ich muss hier etwas falsch machen .. nicht sicher, warum, aber nach dem erneuten Lesen der Dokumentation heißt es eindeutig, dass es sich so verhalten soll, wie ich es will .. anscheinend verursacht etwas anderes Ereignisse, die durchkommen/One wird mehrfach behandelt etc .. –

Antwort

0

Wie @nikoniko bereits erwähnt, wird Gas den Trick machen.

using System; 
using System.Reactive.Linq; 

namespace Printing { 
class Program { 
    static void Main(string[] args) { 
     var source = Observable.Interval(TimeSpan.FromMilliseconds(333)) 
      .Do(i => Console.WriteLine($"new item: {i}")); 
     var sampling = source.Throttle(TimeSpan.FromSeconds(1)) 
      .Do(i => Console.WriteLine($"sampled: {i}")); 

     var subscription = sampling.Subscribe(); 

     Console.ReadLine(); 

     subscription.Dispose(); 

     Console.ReadLine(); 
    } 
} 

} in nichts Resultierende, weil die von der Quelle Ereignisse in zwei Hochfrequenz kommen. Aber wenn Quelle mehr Zeit benötigt ein Element dann die Zeitspanne in Gas gegeben liefern:

using System; 
using System.Reactive.Linq; 

namespace Printing { 
    class Program { 
     static void Main(string[] args) { 
      var source = Observable.Interval(TimeSpan.FromSeconds(1.2)) 
       .Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: new item: {i}")); 
      var sampling = source.Throttle(TimeSpan.FromSeconds(1)) 
       .Do(i => Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: {i}")); 

      var subscription = sampling.Subscribe(); 

      Console.ReadLine(); 

      subscription.Dispose(); 

      Console.ReadLine(); 
     } 
    } 
} 

Das Ergebnis erscheint nach der Zeit Drosselung vorbei ist. Wie Sie sehen können, erscheint eine Sekunde, nachdem ein Ereignis in der Quelle ausgelöst wurde, im Ergebnis.

08:32:26: new item: 0 
08:32:27: throttle 0 
08:32:28: new item: 1 
08:32:29: throttle 1 
08:32:30: new item: 2 
08:32:31: throttle 2 
08:32:32: new item: 3 
08:32:33: throttle 3 
08:32:34: new item: 4 
08:32:35: throttle 4 
08:32:36: new item: 5 
08:32:37: throttle 5