WarumReactive Extensions .MaxBy
var a = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish();
a.Subscribe(o =>
{
Console.WriteLine("Test");
});
a.Connect();
Feuer, aber nicht
var a = Observable.Interval(TimeSpan.FromSeconds(1))
.MaxBy(o=>o)
.Publish();
a.Subscribe(o =>
{
Console.WriteLine("Test");
});
a.Connect();
Ich versuche MaxBy in einem anderen Szenario zu verwenden, aber die oben arbeiten nicht einmal zu bekommen.
Das ist mein komplexeres Beispiel
var _telemetryBatchObservable = Observable.FromEventPattern<DeviceStateStreamEventArg>(
ev => DeviceStateStreamEvent += ev,
ev => DeviceStateStreamEvent -= ev)
.Synchronize()
.GroupBy(o => o.EventArgs.DeviceId)
.Select(o => o.MaxBy(i => i.EventArgs.DateTimeOffset))
.SelectMany(o => o.Select(i => i))
.SelectMany(o => o.Select(i => i))
.Buffer(TimeSpan.FromMilliseconds(5000), 100)
.Publish();
Wie soll man wissen, was für nicht-terminierende Quellen funktionieren wird und was nicht? – Murdock
Dokumentation, Experimente. Fast alle Aggregationsoperatoren geben nur bei Beendigung aus: Die einzige Ausnahme ist Scan. Rx ist nach Linq gemustert, das bei aufzählbarer Vervollständigung auch nur einen einzigen Wert ausgibt. – Shlomo
Buffer and Sample funktioniert gut mit Live - Streams ... Ich konnte keine Dokumentation finden, die besagt, dass MaxBy nicht mit nicht - terminierenden Quellen verwendet werden sollte. – Murdock