2014-02-11 5 views
5

Ich habe den folgenden Code (vereinfacht für Buchungszwecke).Convert Event based Code zu Rx

public class SomeDataObject 
{ 
    public delegate void ReadyEventHandler; 
    public delegate void ErrorEventHandler; 

    public event ReadyEventHandler Ready; 
    public event ErrorEventHandler Error; 
    ... 
} 

pubic class ConsumerClass 
{ 
    private SomeDataObject dataObject; 

    private Task<List<string>> GetStrings() 
    { 
     List<string> results = new List<string>(); 
     var tcs = new TaskCompletionSource<List<string>>(); 

     SomeDataObject.ReadyEventHandler ReadyHandler = null; 
     SomeDataObject.ErrorEventHandler ErrorHandler = null; 

     ReadyHandler +=() => 
     { 
      for (int i =0; i < dataObject.ItemCount; i++) 
       results.Add(dataObject[i].ToString()); 

      tcs.TrySetResult(results); 
     } 

     ErrorHandler +=() 
     { 
      tcs.TrySetException(new Exception("oops!"); 
     } 

     dataObject.Ready += ReadyHandler; 
     dataObject.Error += ErrorHandler; 

     dataObject.DoRequest(); 
    } 
} 

Die Idee ist, dass, wenn doRequest Anruf getätigt wird, wird SomeDataObject einige Daten erhalten und entweder die Bereit oder Fehlerereignisse erhöhen (Details nicht wichtig!). Wenn Daten verfügbar sind, gibt der ItemCount an, wie viele Artikel verfügbar sind.

Ich bin neu in Rx und kann kein vergleichbares Beispiel finden. Also ist es möglich, dies in Rx zu konvertieren, so dass IObservable<string> anstelle von Task<List<string>> mit Observable.Create irgendwie zurückgegeben wird?

Grüße Alan

+0

Können Sie uns mehr über die SomeDataObject API erzählen? Es scheint ein bisschen seltsam. Ist es möglich DoRequest() mehr als einmal pro Instanz aufzurufen? Wenn ja, können Anrufe getätigt werden, während sich eine bestehende Anfrage im Flug befindet? Ist es so konzipiert, dass es nur einen Aufruf gibt, aber die Ergebnisse mit mehreren Konsumenten teilt? Es fühlt sich an, als würde DoRequest ein Handle zurückgeben, mit dem die Ergebnisse konsumiert werden können (was eine Aufgabe oder IObservable sein könnte). Wenn Sie einen Fehler haben, erwarten Sie, dass Sie DoRequest() erneut aufrufen können? –

+0

All dies ist wichtig, denn sobald ein Observable-Stream eine Ausnahme hat, werden keine weiteren Ereignisse ausgelöst - es könnte also besser sein, Fehler in TResult darzustellen als die Observable oder Task zu sprengen usw. –

+0

James, DoRequest kann nur aufgerufen werden Einmal. Es behält den internen Zustand bei. Es ist tatsächlich Teil einer externen COM-Bibliothek, über die ich keine Kontrolle habe. –

Antwort

6

Matthews Antwort ist in der Nähe, hat aber einige Probleme. Erstens ist es eifrig, was normalerweise nicht im Sinne von Rx/Functional Programming ist. Als nächstes denke ich, dass Sie in der Lage sein werden, die Ereignis-Handles freizugeben, wenn der Verbraucher disponiert. Schließlich sollte die Verwendung eines Betreffs ein Code-Geruch sein, und dieser Fall verweist auf die beiden oben genannten Probleme :-)

Hier verwende ich Observable.Create (das sollte Ihre # 1 goto Tool in der Toolbox sein, mit Subjekte, die Ihre letzte Zuflucht sind), um sich faul zu verbinden, und bieten auch Trennungs/Freigabe-Ereignisse an, wenn das Abonnement entsorgt wird. Auch dataObject auf einen Parameter der Methode in Erwägung ziehen auch

private IObservable<string> GetStrings() 
{ 
    return Observable.Create<string>(o=> 
    { 
     SomeDataObject.ReadyEventHandler ReadyHandler = null; 
     SomeDataObject.ErrorEventHandler ErrorHandler = null; 

     ReadyHandler +=() => 
     { 
      for (int i =0; i < dataObject.ItemCount; i++) 
       o.OnNext(dataObject[i].ToString()); 

      o.OnCompleted(); 
     } 

     ErrorHandler +=() => 
     { 
      o.OnError(new Exception("oops!")); 
     } 

     dataObject.Ready += ReadyHandler; 
     dataObject.Error += ErrorHandler; 

     dataObject.DoRequest(); 

     return Disposable.Create(()=> 
      { 
       dataObject.Ready -= ReadyHandler; 
       dataObject.Error -= ErrorHandler; 
      }); 
    } 
} 

würde ich. Der gemeinsame Status in einem Async-System ist eine Quelle von Problemen.

+0

Lee, das ist genau das, was ich gesucht habe. Ich hatte es fast, konnte das Konzept aber nicht fassen. dataObject und seine Ereignisse sind tatsächlich Teil einer COM-Bibliothek, über die ich keine Kontrolle habe.Ich war auf der Suche nach einer besseren Möglichkeit, die Ereignisse statt Aufgabe zu behandeln. –

+0

Ihre Lösung funktioniert wunderbar, aber ich frage jetzt, ob ich meinen Code darauf umstellen sollte. Mit Aufgabe kann ich den Anrufer auf die vollständigen Ergebnisse warten lassen - wie kann ich das mit Rx tun. –

+0

Siehe meine Antwort, um zu sehen, dass ich Lees Antwort gezwängt habe, um zu zeigen, wie man eine "Liste " zurückbringt und blockiert, um das Ergebnis mit "Observable.Wait()" zu erhalten. –

0

Ich denke, der folgende Code wird das tun, was Sie wollen. Ein ReplaySubject wird verwendet, um sicherzustellen, dass der Aufrufer alle Ergebnisse erhält, auch wenn die Ereignisse SomeDataObject sofort beginnen.

private IObservable<string> GetStrings() 
{ 
    ReplaySubject<string> results = new ReplaySubject<string>(); 

    SomeDataObject.ReadyEventHandler ReadyHandler = null; 
    SomeDataObject.ErrorEventHandler ErrorHandler = null; 

    ReadyHandler +=() => 
    { 
     for (int i =0; i < dataObject.ItemCount; i++) 
      results.OnNext(dataObject[i].ToString()); 

     results.OnCompleted(); 
    } 

    ErrorHandler +=() 
    { 
     results.OnError(new Exception("oops!")); 
    } 

    dataObject.Ready += ReadyHandler; 
    dataObject.Error += ErrorHandler; 

    dataObject.DoRequest(); 

    return results; 
} 
1

Als Antwort auf Ihre Kommentare zu Lee (perfekt schönen und zecken würdig) Antwort, hier ist, wie seine Antwort zu ändern, um eine einzelne List<string> Antwort und Block dafür zu bekommen:

private IObservable<List<string>> GetStrings(SomeDataObject dataObject) 
{ 
    return Observable.Create<List<string>>(o=> 
    { 
     SomeDataObject.ReadyEventHandler ReadyHandler = null; 
     SomeDataObject.ErrorEventHandler ErrorHandler = null; 

     ReadyHandler =() => 
     { 
      var results = new List<string>(dataObject.ItemCount); 
      for (int i =0; i < dataObject.ItemCount; i++) 
       results.Add(dataObject[i].ToString()); 

      o.OnNext(results); 
      o.OnCompleted(); 
     }; 

     ErrorHandler =() => 
     { 
      o.OnError(new Exception("oops!")); 
     }; 

     dataObject.Ready += ReadyHandler; 
     dataObject.Error += ErrorHandler; 

     dataObject.DoRequest(); 

     return Disposable.Create(()=> 
      { 
       dataObject.Ready -= ReadyHandler; 
       dataObject.Error -= ErrorHandler; 
      }); 
    }); 
} 

Jetzt können Sie blockieren auf dies mit:

var results = GetStrings().Wait(); 

Wenn .NET 4.5, dann in einer async Methode verwenden, können Sie auch tun:

var results = await GetStrings(); 
+0

Embrace the Rx James. Lassen Sie das schreckliche async/erwarte und kaputt. NET Task Sachen sterben den Tod, den es braucht. ;-) –

+0

Ich weigere mich, zu solchen eklatanten Köder Sir zu erheben! :) –