2017-04-04 2 views
2

Ich möchte einen Thread warten (blockieren) bis entweder eine Zeit verstrichen ist oder ein anderer Stream einen Wert pumpt, dachte ich folgendes könnte dies zu erreichen, aber es löst eine Ausnahme, weil der erste Strom leer ist,Wie warte ich auf einen Wert oder bis eine feste Zeit abgelaufen ist mit Reactive Extensions (Rx.Net)

// class level subject manipulated by another thread... 
_updates = new Subject<Unit>(); 
... 
// wait for up to 5 seconds before carrying on...  
var result = Observable.Timer(DateTime.Now.AddSeconds(5)) 
    .TakeUntil(_updates) 
    .Wait(); 

Wie kann ich die Fähigkeit erreichen bis zu 5 Sekunden oder bis der andere Strom pumpt Wert zu blockieren?

Antwort

2

können Sie Observable.Timeout wie folgt verwenden:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait(); 

I Take(1) verwenden, da Timeout-Sequenz, nicht nur nächsten Wert produziert abzuschließen erwartet. Bei Zeitüberschreitung wird System.TimeoutException ausgelöst.

Wenn Sie keine Ausnahme wollen - Sie können Catch verwenden, anstatt einen Wert zu bieten:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)) 
    .Catch(Observable.Return(default(Unit))).Wait(); 
// should catch specific exception, not all 

Wenn Ihr Unit in der Tat, dass rx Einheit von @Shlomo erwähnt ist - Sie können es wie folgt ändern:

var result = _updates.Select(c => (Unit?) c).Take(1) 
    .Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait(); 

oder fangen Sie einfach diese Ausnahme wie üblich.

+0

Ich weiß nicht, ob es darauf ankommt, sondern 'default (Unit) == == Unit.Default neue Unit() ', so 'Ergebnis' wird immer den gleichen Wert haben, der vorwärts geht. – Shlomo

+0

@Shlomo nicht sicher, was du meinst. Wenn die Aktualisierungssequenz einen Wert erzeugt, wird der Wert dieser Wert sein, kein Standardwert. Auch wenn Einheit eine Klasse ist - Standard (Unit) ist Null. Vielleicht bedeutet OP eine bestimmte Einheit, der ich nicht bewusst bin, aber ich behandle Unit nur als willkürliche Klasse. – Evk

+0

Er bezieht sich wahrscheinlich auf System.Reactive.Unit, eine Struktur, keine Klasse. Siehe http://introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html#ObservableStart – Shlomo

0

Können Sie zwei Aufgaben erstellen und warten dann? Das wäre wahrscheinlich der "einfachste" Weg, um das zu erreichen.

Bearbeiten: Titel geändert, um zu spezifizieren Reagieren, Antwort würde wahrscheinlich nicht mehr anwendbar sein.

1

Hier ist eine andere Alternative, wenn Sie mit Ausnahmen beschäftigen möchten nicht:

var _updates = new Subject<Unit>(); 

var result = Observable.Merge(
    _updates.Materialize(), 
    Observable.Empty<Unit>() 
     .Delay(TimeSpan.FromSeconds(5)) 
     .Materialize() 
    ) 
    .Take(1) 
    .Wait(); 

switch (result.Kind) 
{ 
    case NotificationKind.OnCompleted: 
     //time's up, or someone called _updates.OnCompleted(). 
     break; 
    case NotificationKind.OnNext: 
     var message = result.Value; 
     //Message received. Handle message 
     break; 
    case NotificationKind.OnError: 
     var exception = result.Exception; 
     //Exception thrown. Handle exception 
     break; 
} 
Verwandte Themen