2017-08-23 1 views
2

WarumReactive Extensions .MaxBy

var a = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Publish(); 

a.Subscribe(o => 
{ 
    Console.WriteLine("Test"); 
}); 
a.Connect(); 

Feuer, aber nicht

var a = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .MaxBy(o=>o) 
    .Publish(); 

a.Subscribe(o => 
{ 
    Console.WriteLine("Test"); 
}); 
a.Connect(); 

Ich versuche MaxBy in einem anderen Szenario zu verwenden, aber die oben arbeiten nicht einmal zu bekommen.

Das ist mein komplexeres Beispiel

var _telemetryBatchObservable = Observable.FromEventPattern<DeviceStateStreamEventArg>(
     ev => DeviceStateStreamEvent += ev, 
     ev => DeviceStateStreamEvent -= ev) 
    .Synchronize() 
    .GroupBy(o => o.EventArgs.DeviceId) 
    .Select(o => o.MaxBy(i => i.EventArgs.DateTimeOffset)) 
    .SelectMany(o => o.Select(i => i)) 
    .SelectMany(o => o.Select(i => i)) 
    .Buffer(TimeSpan.FromMilliseconds(5000), 100) 
    .Publish(); 

Antwort

3

Max und MaxBy nur einen einzigen Wert emittieren, wenn die beobachtbaren Quelle beendet. Wenn Sie eine nicht terminierende Quelle haben, werden sie niemals emittieren.

Versuchen Sie diesen Code als Gegenbeispiel:

var a = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Take(3) //causes termination after 3 values 
    .MaxBy(o => o) 
    .Publish(); 
+0

Wie soll man wissen, was für nicht-terminierende Quellen funktionieren wird und was nicht? – Murdock

+0

Dokumentation, Experimente. Fast alle Aggregationsoperatoren geben nur bei Beendigung aus: Die einzige Ausnahme ist Scan. Rx ist nach Linq gemustert, das bei aufzählbarer Vervollständigung auch nur einen einzigen Wert ausgibt. – Shlomo

+0

Buffer and Sample funktioniert gut mit Live - Streams ... Ich konnte keine Dokumentation finden, die besagt, dass MaxBy nicht mit nicht - terminierenden Quellen verwendet werden sollte. – Murdock

2

Shlomos genagelt es als .MaxBy kann keinen Wert auf einer nicht abbrechenden Sequenz erzeugen.

Wenn Sie möchten, können Sie .Scan verwenden, um eine Sequenz des maximalen Wertes bis heute zu erstellen.

Sie können dies tun:

var a = 
    Observable 
     .Interval(TimeSpan.FromSeconds(1)) 
     .Select(x => x % 4L) 
     .Scan(long.MinValue, (x, y) => x > y ? x : y) 
     .Publish(); 

a.Subscribe(o => 
{ 
    Console.WriteLine(o); 
}); 
a.Connect(); 

ich in der .Select(x => x % 4L) setzen die Sequenz interessanter zu machen.

Das gibt:

 
0 
1 
2 
3 
3 
3 
3 
3 

Wenn Sie nur produzieren, wenn ein Wert wollen dort ein neues Maximum ist dann ein .DistinctUntilChanged() nach dem .Scan hinzufügen. Schön und einfach.

Verwandte Themen