Ich habe eine asnyc
Funktion, die ich bei jeder Beobachtung in einer IObservable
Sequenz aufrufen möchte und die Lieferung auf jeweils ein Ereignis beschränkt. Der Verbraucher erwartet nicht mehr als eine Nachricht im Flug; und das ist auch der RX-Vertrag, wenn ich es richtig verstehe.Subskribierte beobachtbare Sequenz mit asynchroner Funktion
Betrachten Sie dieses Beispiel:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Die Consume
Funktion eine 750 ms Verarbeitungszeit fälscht und ob
produziert Ereignisse alle 100 ms. Der obige Code funktioniert, aber ruft task.Wait()
auf einem zufälligen Thread auf. Wenn ich stattdessen wie in der kommentierten Zeile 3 subskribieren, dann wird Consume
mit der gleichen Rate aufgerufen, mit der ob
Ereignisse erzeugt (und ich kann nicht einmal herausfinden, welche Überladung von Subscribe
ich in dieser kommentierten Anweisung verwende, also ist es wahrscheinlich Unsinn).
Wie also liefere ich jeweils ein Ereignis von einer beobachtbaren Sequenz zu einer async
Funktion?
werfen Sie einen Blick auf diese: http://stackoverflow.com/questions/23006852/howto-call-back-async-function-from-rx-subscribe –
@NedStoyanov: Ah, Reijerhs Antwort http: // stackoverflow. com/a/30030553/1149924 funktioniert, sieht aber unglaublich undurchdringlich aus! Ist das die Art und Weise, wie du es in RX machst? – kkm