2017-11-15 5 views
2

Das Szenario sieht wie folgt aus: Ein Gerät, das kommuniziert, gilt als verbunden, wenn es innerhalb kurzer Zeit zum Server zurückwechselt. Ich möchte eine Klasse erstellen, die die Funktionalität enthält, diesen Status zu verfolgen. Bei einem Anruf an das Gerät sollte das Timeout zurückgesetzt werden. Beim Rückruf wird die Verbindung bestätigt, und der Status sollte auf true festgelegt werden. Wenn der Rückruf abgelaufen ist, sollte er auf false gesetzt werden. Aber der nächste Anruf sollte in der Lage sein, das Zeitlimit wieder gleichgültig zum aktuellen Status zu setzen.Wie erreiche ich die Abfolge von Timeouts mit RX?

Ich dachte, dies mit RX mit swith und timeout zu erreichen. Aber ich weiß nicht, warum es aufhört zu arbeiten.

public class ConnectionStatus 
{ 
private Subject<bool> pending = new Subject<bool>(); 
private Subject<bool> connected = new Subject<bool>(); 

public bool IsConnected { get; private set; } 

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15) 
{ 
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
     .Switch() 
     .Subscribe(_ => IsConnected = true, e => IsConnected = false, token); 
} 

public void ConfirmConnected() 
{ 
    connected.OnNext(true); 
} 

public void SetPending() 
{ 
    pending.OnNext(true); 
} 
} 

Dies ist die „Testfall“:

var c = new ConnectionStatus(default(CancellationToken)); 

c.SetPending(); 
await Task.Delay(TimeSpan.FromSeconds(5)); 
c.ConfirmConnected(); 
c.IsConnected.Dump(); // TRUE, OK 

c.SetPending(); 
await Task.Delay(TimeSpan.FromSeconds(5)); 
c.ConfirmConnected(); 
c.IsConnected.Dump(); // TRUE, OK 

c.SetPending(); 
await Task.Delay(TimeSpan.FromSeconds(20)); 
c.IsConnected.Dump(); // FALSE, OK 
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK 

c.SetPending(); 
await Task.Delay(TimeSpan.FromSeconds(10)); 
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK! 

Ich gehe davon aus, dass das Timeout des inneren beobachtbaren auch die äußere beobachtbare anhält. Als outer => Lambda wird nicht mehr aufgerufen. Was ist der richtige Weg?

Danke

Antwort

3

Das Problem ist, dass Timeout im Wesentlichen eine Ausnahme verursacht, dass die Rx-Abonnements Sprengung. Nachdem das Timeout ausgelöst wurde (wie Sie es haben), werden keine weiteren Benachrichtigungen gesendet. Rx-Grammatik ist, dass Sie * OnNext Nachrichten gefolgt von entweder OnCompleted oder einem OnError haben können. Nachdem die OnError von der Timeout gesendet wurde, werden Sie keine weiteren Nachrichten sehen.

Sie müssen die Timeout-Nachricht über OnNext Nachrichten anstelle einer OnError Nachricht gesendet haben. In Ihrem alten Code haben Sie alle OnError in eine falsche und alle OnNext in eine wahre verwandelt. Stattdessen müssen Sie den richtigen neuen IsConnected Wert in OnNext Nachrichten einbetten. Hier ist, wie das zu tun:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15) 
{ 
    pending.Select(_ => connected 
      .Timeout(TimeSpan.FromSeconds(timeoutSeconds)) 
      .Materialize() 
      .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
       ? Notification.CreateOnNext(false) 
       : n) 
      .Dematerialize() 
      .Take(1) 
     ) 
     .Switch() 
     .Subscribe(b => IsConnected = b, token); 
} 
+0

Vielen Dank. Ich habe Ihren Vorschlag getestet und festgestellt, dass nach 'Select' ein' .FirstOrDefaultAsync() 'benötigt wird. Ohne es wird das Timeout auch nach dem 'connected' mit dem Wert ausgelöst. Ich habe inzwischen eine andere Lösung gefunden: 'pending.Select (_ => connected.Buffer (TimeSpan.FromSeconds (timeoutSeconds), 1) .FirstOrDefaultAsync()) .Switch(). Abonnieren (l => IsConnected = l.Count> 0 , Token); ' – ZorgoZ

+0

Ihre andere Lösung sollte auch funktionieren. – Shlomo

+0

In welchem ​​Fall wurde 'FirstOrDefaultAsync' benötigt? – Shlomo

3

Hier ist eine alternative Möglichkeit, um den Strom von IsConnected Werten zu erzeugen, ohne .TimeOut mit:

public class ConnectionStatus 
{ 
    private Subject<Unit> pending = new Subject<Unit>(); 
    private Subject<Unit> connected = new Subject<Unit>(); 

    public bool IsConnected { get; private set; } 

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15) 
    { 
     pending 
      .Select(outer => 
       Observable.Amb(
        connected.Select(_ => true), 
        Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false))) 
      .Switch() 
      .Subscribe(isConnected => IsConnected = isConnected, token); 
    } 

    public void ConfirmConnected() 
    { 
     connected.OnNext(Unit.Default); 
    } 

    public void SetPending() 
    { 
     pending.OnNext(Unit.Default); 
    } 
} 

Der Observable.Amb Bediener einfach einen Wert annimmt, von welchen beobachtbaren einen Wert erzeugt zuerst - Es ist besser, mit Ausnahmen zu codieren.

Verwandte Themen