2014-04-11 7 views
15

Ich möchte eine asynchrone Funktion innerhalb eines Rx-Abonnements zurückrufen.Wie async-Funktion von RX subscribe zurückrufen?

z. wie das:

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public ReplaySubject<string> Results = new ReplaySubject<string>(); 

    public void Trigger() 
    { 
     Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); 
    } 

    public Task RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

public class Service 
{ 
    public async Task<string> DoAsync() 
    { 
     return await Task.Run(() => Do()); 
    } 

    private static string Do() 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(200)); 
     throw new ArgumentException("invalid!"); 
     return "foobar"; 
    } 
} 

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    sut.Trigger(); 
    var result = await sut.Results.FirstAsync(); 
} 

Was muss getan werden, um die Ausnahme richtig zu fangen?

+1

Ich dachte nur, dass ich Async an erster Stelle setzen kann. Aber leider löst das nicht das Problem, mit dem ich konfrontiert bin. Ich werde ein expliziteres Beispiel veröffentlichen. –

+0

mögliches Duplikat von [Gibt es eine Möglichkeit, einen Beobachter als async zu abonnieren] (http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-anobserver-as-async) – Zache

Antwort

9

Sie möchten keine async-Methode an Subscribe übergeben, weil dadurch eine async void-Methode erstellt wird. Tun Sie Ihr Bestes, um zu vermeiden async void.

In Ihrem Fall, ich denke, was Sie wollen, ist die async Methode für jedes Element der Sequenz aufrufen und dann alle Ergebnisse zwischenzuspeichern. In diesem Fall verwendet SelectMany die async Methode für jedes Element zu rufen, und Replay zu Cache (plus ein Connect den Ball ins Rollen zu bringen):

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public IObservable<string> Trigger() 
    { 
     var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) 
      .SelectMany(_ => RunAsync()) 
      .Replay(); 
     connectible.Connect(); 
     return connectible; 
    } 

    public Task<string> RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

Ich habe die Results Eigenschaft aus der Trigger Methode zurückgegeben werden, anstatt , was ich denke, macht mehr Sinn, so dass nun der Test wie folgt aussieht:

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    var results = sut.Trigger(); 
    var result = await results.FirstAsync(); 
} 
+1

Das ist eine großartige Lösung. Danke Stephen. –

+0

Was ist, wenn Sie Ausnahmen von RunAsync abfangen wollen? Was ich habe, ist: obs.SelectMany (x => Observable.FromAsync (() => RunAsync()). Catch (Observable.Empty ())) Gibt es einen besseren Weg? –

+0

@VictorGrigoriu: Ich empfehle Ihnen, Ihre eigene Frage zu stellen und sie mit 'system.reactive' zu ​​versehen. –

14

ändern Sie diese an:

Observable.Timer(TimeSpan.FromMilliseconds(100)) 
    .SelectMany(async _ => await RunAsync()) 
    .Subscribe(); 

Subscribe speichert die asynchrone Operation nicht innerhalb des Observable.

+1

Danke Paul, für Ihren Vorschlag. Sehr interessant. Hast du etwas, wo ich ein bisschen mehr darüber lesen kann? –

20

Paul Betts Antwort funktioniert in den meisten Szenarien, aber wenn man den Strom sperren möchten, während für die Asynchron-Funktion warten Sie so etwas wie diese müssen beenden:

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(l => Observable.FromAsync(asyncMethod)) 
      .Concat() 
      .Subscribe(); 

Oder:

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) 
      .Concat() 
      .Subscribe(); 
+1

Es ist ein anderer Anwendungsfall, aber sehr interessant. –

Verwandte Themen