Ich möchte meine ereignisbasierte Legacy-Methode zu beobachtbaren basierend, aber ich bin ziemlich neu in Rx, so dass ich jetzt stecken.Rx: Warte auf das erste Element für einen bestimmten Zeitraum
Ich habe eine Ereignisquelle, die inzwischen beobachtbar ist. Zu einem bestimmten Zeitpunkt muss ich eine Methode starten, die entweder endet, indem das nächste Element in der Zeile zurückgegeben wird, oder null, wenn das Zeitlimit überschritten wird.
Die ereignisbasierte Ansatz sieht wie folgt aus:
public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);
EventHandler<ReaderEvent> localHandler = (o, e) =>
{
if (e.PlaceId == PlaceId)
{
result = e;
cts.Cancel();
}
};
ReaderEventHandler += localHandler;
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
//...
}
ReaderEventHandler -= localHandler;
}
return result;
}
Wie Sie sehen können, ist die Idee, dass die Verzögerung entweder durch die Ankunft der Veranstaltung abgesagt ich für oder Token Quelle am waiting wird abgebrochen durch Konfiguration nach diesem bestimmten Zeitraum. Sehr sauber.
nun die Rx-Version:
public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;
var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);
using (observable.Subscribe(x => {
result = x;
cts.Cancel();
{
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
}
}
return result;
}
nicht so sauber ... noch schlimmer ... ich auch mit Timeout Erweiterung versucht haben. Aber da es sich um eine einmalige Aufnahme handelt, muss ich irgendwie warten, bevor ich das Abonnement ablege. Der einzige Unterschied wäre, dass der OnError das lokale Token löscht, nicht den eingebauten Mechanismus von CancelAfter.
Gibt es einen Teig/mehr prägnanten (mehr auf Rx) Weg, dies zu tun?
Vielen Dank!
Es endet nicht auf diese Weise. Aber nach dem Hinzufügen einer .FirstOrDefaultAsync() vor. ToTask() hat es funktioniert. Ja, das ist das sauberste von allen. Und ich kann meine Stornierungsmarke nahtlos hinzufügen. Vielen Dank! – ZorgoZ
@ZorgoZ - Es macht keinen Sinn, den Code für die Stornierungstoken zu verwenden, da Sie damit nicht die zugrundeliegende Observable löschen. Es ist ein bisschen sinnlos. – Enigmativity