2016-05-10 9 views
2

Ich habe eine asnyc Funktion, die ich bei jeder Beobachtung in einer IObservable Sequenz aufrufen möchte und die Lieferung auf jeweils ein Ereignis beschränkt. Der Verbraucher erwartet nicht mehr als eine Nachricht im Flug; und das ist auch der RX-Vertrag, wenn ich es richtig verstehe.Subskribierte beobachtbare Sequenz mit asynchroner Funktion

Betrachten Sie dieses Beispiel:

static void Main() { 
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
    //var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit. 
    var d = ob.Subscribe(x => Consume(x).Wait()); 
    Thread.Sleep(10000); 
    d.Dispose(); 
} 

static async Task<Unit> Consume(long count) { 
    Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}"); 
    await Task.Delay(750); 
    Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}"); 
    return Unit.Default; 
} 

Die Consume Funktion eine 750 ms Verarbeitungszeit fälscht und ob produziert Ereignisse alle 100 ms. Der obige Code funktioniert, aber ruft task.Wait() auf einem zufälligen Thread auf. Wenn ich stattdessen wie in der kommentierten Zeile 3 subskribieren, dann wird Consume mit der gleichen Rate aufgerufen, mit der ob Ereignisse erzeugt (und ich kann nicht einmal herausfinden, welche Überladung von Subscribe ich in dieser kommentierten Anweisung verwende, also ist es wahrscheinlich Unsinn).

Wie also liefere ich jeweils ein Ereignis von einer beobachtbaren Sequenz zu einer async Funktion?

+0

werfen Sie einen Blick auf diese: http://stackoverflow.com/questions/23006852/howto-call-back-async-function-from-rx-subscribe –

+0

@NedStoyanov: Ah, Reijerhs Antwort http: // stackoverflow. com/a/30030553/1149924 funktioniert, sieht aber unglaublich undurchdringlich aus! Ist das die Art und Weise, wie du es in RX machst? – kkm

Antwort

6

Abonnenten sollten nicht lange ausgeführt werden, und daher gibt es keine Unterstützung für die Ausführung von Async-Methoden mit langer Laufzeit in den Subscriber-Handlern.

Betrachten Sie stattdessen Ihre asynchrone Methode als eine beobachtbare Einzelwertsequenz, die einen Wert aus einer anderen Sequenz nimmt. Jetzt können Sie Sequenzen erstellen, für die Rx entwickelt wurde.

Jetzt, da Sie diesen Sprung gemacht haben, werden Sie wahrscheinlich etwas haben, was @Reijher in Howto call back async function from rx subscribe? erstellt.

Die Aufschlüsselung seines Codes ist wie folgt.

//The input sequence. Produces values potentially quicker than consumer 
Observable.Interval(TimeSpan.FromSeconds(1)) 
     //Project the event you receive, into the result of the async method 
     .Select(l => Observable.FromAsync(() => asyncMethod(l))) 
     //Ensure that the results are serialized 
     .Concat() 
     //do what you will here with the results of the async method calls 
     .Subscribe(); 

In diesem Szenario erstellen Sie implizite Warteschlangen. In jedem Problem, bei dem der Hersteller schneller ist als der Verbraucher, muss eine Warteschlange verwendet werden, um Werte während des Wartens zu sammeln. Persönlich bevorzuge ich dies explizit zu machen, indem ich Daten in eine Warteschlange setze. Alternativ können Sie explizit einen Scheduler verwenden, um zu signalisieren, dass es sich um das Threading-Modell handelt, das den Puffer aufnehmen soll.

Dies scheint eine beliebte Hürde (Ausführen von Async in einem Subskriptionshandler) für Rx Newcomer. Es gibt viele Gründe, dass die Anleitung ist, sie nicht in Ihrem Abonnenten zu setzen, zum Beispiel: 1. Sie brechen das Fehlermodell 2. Sie mischen asynchrone Modelle (rx hier, Aufgabe dort) 3. abonnieren ist der Verbraucher einer Zusammensetzung von asynchronen Sequenzen. Eine asynchrone Methode ist nur eine Einzelwertsequenz, daher kann das Ergebnis dieser Ansicht nicht das Ende der Sequenz sein.

UPDATE

Um den Kommentar zu brechen das Fehlermodell ist hier ein Update der OP Probe veranschaulichen.

void Main() 
{ 
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
    var d = ob.Subscribe(
     x => ConsumeThrows(x).Wait(), 
     ex=> Console.WriteLine("I will not get hit")); 

    Thread.Sleep(10000); 
    d.Dispose(); 
} 

static async Task<Unit> ConsumeThrows(long count) 
{ 
    return await Task.FromException<Unit>(new Exception("some failure")); 
    //this will have the same effect of bringing down the application. 
    //throw new Exception("some failure"); 
} 

Hier können wir sehen, ob die OnNext Handler werfen sollte, dann werden wir nicht geschützt durch unsere Rx OnError Handler. Die Ausnahme würde nicht behandelt werden und höchstwahrscheinlich die Anwendung herunterfahren.

+0

Vielen Dank, Ihre Erklärung, wie diese Komposition funktioniert, ist sehr gut! In meinem Fall habe ich nicht viel Auswahl, da der Consumer eine Gegebenheit ist (gRPC-Streams, die als async deklariert sind). Ich erwarte jedoch nicht, dass es unter normalen Netzwerkbedingungen der Engpass ist, und wenn, dann habe ich wahrscheinlich mehr Probleme, als mit einer vernünftigen Wiederherstellungsstrategie gehandhabt werden könnten - ich kann nicht einmal an dieser Stelle implizites Queuing gegenüber e bevorzugen. G. Daten fallen lassen. Ich werde explizite Warteschlangen in Betracht ziehen, aber wie es aussieht, ist diese Lösung für meine Zwecke sicherlich in Ordnung. – kkm

+0

Lee, könntest du bitte klarstellen, was meinst du mit "Brechen des Fehlermodells"? Wenn ich eine Ausnahme von Consume auslege, beendet es die Sequenz mit 'OnError'. Auf den ersten Blick scheinen Fehler zu tun, was sie sollten. Fehle ich etwas? – kkm

+0

Wenn Sie in Ihrem Beispiel in der OP, entweder Fehleranweisung 'neue Exception (" einige Fehler ") hinzufügen;' oder 'zurück warten Task.FromException (neue Ausnahme (" einige Fehler "));' Sie werden zu Fall bringen die Anwendung. Sie werden Ihren 'OnError'-Handler nicht treffen. In dem Beispielcode, in dem die Async-Methode aus dem Subscribe in ein 'Select'-Objekt verschoben wird, wird die Sequenz jedoch mit 'OnError' beendet, das ordnungsgemäß verarbeitet werden kann. –

Verwandte Themen