2012-04-24 9 views
18

So in den traurigen Tagen von C# 4.0, ich erstellte die folgende "WorkflowExecutor" -Klasse, die asynchrone Workflows in der GUI-Thread durch hacken in IEnumerable "Yield Return" Fortsetzungen auf Observablen warten. Der folgende Code würde also bei button1Click einfach einen einfachen Workflow starten, der den Text aktualisiert, auf das Klicken auf button2 wartet und nach 1 Sekunde eine Schleife ausführt.erwartet auf einer beobachtbaren

public sealed partial class Form1 : Form { 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); 

    public Form1() { 
     InitializeComponent(); 
    } 

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() { 
     Text = "Initializing"; 
     var scheduler = new ControlScheduler(this); 
     while (true) { 
      yield return scheduler.WaitTimer(1000); 
      Text = "Waiting for Click"; 
      yield return _button2Subject; 
      Text = "Click Detected!"; 
      yield return scheduler.WaitTimer(1000); 
      Text = "Restarting"; 
     } 
    } 

    void button1_Click(object sender, EventArgs e) { 
     _workflowExecutor.Run(CreateAsyncHandler()); 
    } 

    void button2_Click(object sender, EventArgs e) { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) { 
     _workflowExecutor.Stop(); 
    } 
} 

public static class TimerHelper { 
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { 
     return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); 
    } 
} 

public sealed class WorkflowExecutor { 
    IEnumerator<IObservable<Unit>> _observables; 
    IDisposable _subscription; 

    public void Run(IEnumerable<IObservable<Unit>> actions) { 
     _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); 
     Continue(); 
    } 

    void Continue() { 
     if (_subscription != null) { 
      _subscription.Dispose(); 
     } 
     if (_observables.MoveNext()) { 
      _subscription = _observables.Current.Subscribe(_ => Continue()); 
     } 
    } 

    public void Stop() { 
     Run(null); 
    } 
} 

Der smart Teil der Idee, „Ausbeute“ Fortsetzungen mit der asynchronen Arbeit zu tun, von Daniel Earwicker der AsyncIOPipe Idee genommen wurde: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, dann habe ich den reaktiven Rahmen oben drauf.

Jetzt habe ich Probleme beim Umschreiben mit der Async-Funktion in C# 5.0, aber es scheint, als sollte es einfach zu tun sein. Wenn ich Observables in Tasks umwandle, laufen sie nur einmal und die While-Schleife stürzt beim zweiten Mal ab. Jede Hilfe, die das beheben würde, wäre großartig.

All das gesagt/gefragt, was gibt mir der async/awaymechanismus, dass der WorkflowExecutor nicht? Gibt es etwas, was ich mit async tun kann/erwarte, dass ich nicht (mit einer ähnlichen Menge an Code) mit dem WorkflowExecutor tun kann?

+0

Wie genau haben Sie getan, dass die Umstellung auf 'Task's? Wie stürzt der Look ab? – svick

+1

Und 'erwarten' hat viele Vorteile gegenüber dieser Art von Asynchronie, aber einer der großen Unterschiede ist, dass von den Erwartungswerten zurückkehrt. Z.B. 'string s = erwarten client.DownloadStringAsync (url);'. – svick

Antwort

24

Wie Sie bemerkt haben, ist Task eine einmalige Sache, im Gegensatz zu Observables "Stream of Events". Eine gute Möglichkeit, diesen (IMHO) des Denkens ist die 2x2-Chart auf den Rx team's post about 2.0 Beta:

2x2 chart for task vs observable

Je nach Umstand (einmaligen vs. ‚Strom‘ von Ereignissen), beobachtbare halten könnte mehr Sinn machen.

Wenn Sie auf die Reactive 2.0 Beta zugreifen können, können Sie Observable damit erwarten. Zum Beispiel meine eigenen Versuch einer 'async/await' (ungefähre) Version des Codes wäre:

public sealed partial class Form1 : Form 
{ 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 

    private bool shouldRun = false; 

    public Form1() 
    { 
     InitializeComponent(); 
    } 

    async Task CreateAsyncHandler() 
    { 
     Text = "Initializing"; 
     while (shouldRun) 
     { 
      await Task.Delay(1000); 
      Text = "Waiting for Click"; 
      await _button2Subject.FirstAsync(); 
      Text = "Click Detected!"; 
      await Task.Delay(1000); 
      Text = "Restarting"; 
     } 
    } 

    async void button1_Click(object sender, EventArgs e) 
    { 
     shouldRun = true; 
     await CreateAsyncHandler(); 
    } 

    void button2_Click(object sender, EventArgs e) 
    { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) 
    { 
     shouldRun = false; 
    } 
} 
+0

'Task' ist ein einmaliger Gebrauch, aber Sie können' '' erwarten '' '' '' s '' s'''''''''''''''''''''''''''''''''''''''''''''''So sollte es möglich sein, ein erwartetes zu schaffen, das das ganze "IObservable " darstellen kann, nicht nur ein Einzelteil. – svick

+0

Das habe ich im Codebeispiel gemacht. Mit Rx 2.0 können Sie auf Observables warten. Das Standardverhalten gibt das letzte Element der Observable zurück, weshalb es FirstAsync ist. –

22

Wie James erwähnt, können Sie eine IObservable <T> Sequenz warten mit Rx v2.0 Beta starten . Das Verhalten besteht darin, das letzte Element (vor dem OnCompleted) zurückzugeben oder den beobachteten OnError zu werfen. Wenn die Sequenz keine Elemente enthält, wird eine InvalidOperationException ausgegeben.

Hinweis diese verwenden, können Sie alle anderen gewünschten Verhaltensweisen erhalten:

  • das erste Element Get von xs.FirstAsync erwartet()
  • Stellen Sie sicher, es gibt nur einen einzigen Wert von xs.SingleAsync erwartet()
  • Wenn Sie mit einer leeren Sequenz fein sind, erwarten xs.DefaultIfEmpty()
  • Um alle Elemente zu erhalten, erwarten xs.ToArray() oder warten xs.ToList()

Sie können noch mehr Phantasie Dinge, tun das Ergebnis einer Aggregation wie Berechnung, sondern Zwischenwerte beobachten von Do und Scan verwenden:

var xs = Observable.Range(0, 10, Scheduler.Default); 

var res = xs.Scan((x, y) => x + y) 
      .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); 

Console.WriteLine("Done! The sum is {0}", await res); 
+1

Dies ist die Information, nach der ich gesucht habe, nachdem ich überrascht war, in einem aktuellen Projekt zu sehen, dass ein IObservable nur gut gebaut. Danke für das Teilen. – jpierson

Verwandte Themen