2017-10-02 3 views
3

Ich möchte meine ereignisbasierte Legacy-Methode zu beobachtbaren basierend, aber ich bin ziemlich neu in Rx, so dass ich jetzt stecken.Rx: Warte auf das erste Element für einen bestimmten Zeitraum

Ich habe eine Ereignisquelle, die inzwischen beobachtbar ist. Zu einem bestimmten Zeitpunkt muss ich eine Methode starten, die entweder endet, indem das nächste Element in der Zeile zurückgegeben wird, oder null, wenn das Zeitlimit überschritten wird.

Die ereignisbasierte Ansatz sieht wie folgt aus:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    ReaderEvent result = null; 
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken })) 
    { 
     cts.CancelAfter(waitFor); 

     EventHandler<ReaderEvent> localHandler = (o, e) => 
     { 
      if (e.PlaceId == PlaceId) 
      { 
       result = e; 
       cts.Cancel(); 
      } 
     }; 

     ReaderEventHandler += localHandler; 
     try 
     { 
      await Task.Delay(waitFor, cts.Token).ConfigureAwait(false); 
     } 
     catch (OperationCanceledException) { } 
     catch (Exception ex) 
     { 
      //... 
     } 

     ReaderEventHandler -= localHandler; 
    } 

    return result; 
} 

Wie Sie sehen können, ist die Idee, dass die Verzögerung entweder durch die Ankunft der Veranstaltung abgesagt ich für oder Token Quelle am waiting wird abgebrochen durch Konfiguration nach diesem bestimmten Zeitraum. Sehr sauber.

nun die Rx-Version:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    ReaderEvent result = null; 

    var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId); 

    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken })) 
    { 
     cts.CancelAfter(waitFor); 
     using (observable.Subscribe(x => { 
      result = x; 
      cts.Cancel(); 
     { 
      try 
      { 
       await Task.Delay(waitFor, cts.Token).ConfigureAwait(false); 
      } 
      catch (OperationCanceledException) { } 
     } 
    } 
    return result; 
} 

nicht so sauber ... noch schlimmer ... ich auch mit Timeout Erweiterung versucht haben. Aber da es sich um eine einmalige Aufnahme handelt, muss ich irgendwie warten, bevor ich das Abonnement ablege. Der einzige Unterschied wäre, dass der OnError das lokale Token löscht, nicht den eingebauten Mechanismus von CancelAfter.

Gibt es einen Teig/mehr prägnanten (mehr auf Rx) Weg, dies zu tun?

Vielen Dank!

Antwort

1

Warum nicht einfach mit einer einfachen Rx Version des Codes gehen:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    return await 
     _OnReaderEvent 
      .Where(r => r.PlaceId == PlaceId) 
      .Buffer(waitFor, 1) 
      .Select(xs => xs.FirstOrDefault()) 
      .FirstOrDefaultAsync() 
      .ToTask(); 
} 
+0

Es endet nicht auf diese Weise. Aber nach dem Hinzufügen einer .FirstOrDefaultAsync() vor. ToTask() hat es funktioniert. Ja, das ist das sauberste von allen. Und ich kann meine Stornierungsmarke nahtlos hinzufügen. Vielen Dank! – ZorgoZ

+0

@ZorgoZ - Es macht keinen Sinn, den Code für die Stornierungstoken zu verwenden, da Sie damit nicht die zugrundeliegende Observable löschen. Es ist ein bisschen sinnlos. – Enigmativity

3

Sie könnten versuchen, mit:

var values = await _OnReaderEvent 
    .Where(r => r.PlaceId == placeId) 
    .Buffer(waitFor, 1) 
    .FirstAsync(); // get list of matching elements during waitFor time 

return values.FirstOrDefault(); // return first element or null if the list is empty 
+0

Nun, es wie in diesem Fall r aussieht, ist IList , aber die andersherum sah vielversprechend aus: var values ​​= erwartet _OnReaderEvent.FirstAsync (r => r.PlaceId == PlaceId) .Buffer (waitFor); Rückgabewerte.FirstOrDefault(); Dennoch scheint Buffer() zu blockieren. Wenn kein Ereignis in der Zeile ist, endet es nicht :( – ZorgoZ

+0

Buffer return Liste , wobei T ist Typ von Observable-Elemente. Es ist eine Liste aller Elemente, die im Zeitfenster von WaitFor produziert wurden und es sollte leere Liste jede WaitFor zurückgeben Wenn Sie Buffer -> FirstAsync tun, erhalten Sie die erste Liste. Siehe https://msdn.microsoft.com/en-us/library/hh229813(v=vs.103).aspx –

+2

Ahh .. hat den Punkt. Dann ist das was ich brauche: var values ​​= erwarten _OnReaderEvent.Where (r => r.PlaceId == PlaceId) .Buffer (waitFor, 1) .FirstAsync(); return values.FirstOrDefault(); Sie haben mich nach rechts gerichtet Richtung! – ZorgoZ

Verwandte Themen