Ich habe eine Reihe von Ereignissen kommen und ich muss alle von ihnen ohne Verlust ausführen, aber ich möchte sicherstellen, dass sie zu den entsprechenden Zeitfenstern gepuffert und konsumiert werden. Jeder hat eine Lösung?Was ist der beste Weg, um einen Observable "Rate Limit" zu verbrauchen?
Ich kann keine Operatoren in Rx finden, die das ohne den Verlust der Ereignisse tun können (Throttle - verliert Ereignisse). Ich habe auch über Pufferung, Verzögerung usw. nachgedacht. Kann keine gute Lösung finden.
Ich habe versucht, einen Timer in der Mitte zu setzen, aber irgendwie funktioniert es nicht funktionieren:
GetInitSequence()
.IntervalThrottle(TimeSpan.FromSeconds(5))
.Subscribe(
item =>
{
Console.WriteLine(DateTime.Now);
// Process item
}
);
public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
{
return Observable.Create<T>(o =>
{
return source.Subscribe(x =>
{
new Timer(state =>
o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
}, o.OnError, o.OnCompleted);
});
}
Können Sie eine Marmor hinzufügen digram zeigt was du hast und was du willst? Wie andere bin ich mir nicht sicher, was Sie erreichen möchten, da ich denke, dass Buffer einfach in Ordnung ist. –
Was beschränkst du? – Fredrick