2015-07-11 8 views
5

Ich habe eine Klasse, die einen Strom von Ereignissen aufnimmt und einen weiteren Strom von Ereignissen ausstößt.Mit Reaktive Erweiterungen (RX), ist es möglich, einen "Pause" -Befehl hinzuzufügen?

Alle Ereignisse verwenden Reaktive Erweiterungen (RX). Der eingehende Datenstrom von Ereignissen wird von einer externen Quelle in eine IObserver<T> unter Verwendung von .OnNext geschoben, und der ausgehende Datenstrom von Ereignissen wird unter Verwendung von IObservable<T> und .Subscribe ausgegeben. Ich benutze Subject<T>, um dies hinter den Kulissen zu verwalten.

Ich frage mich, welche Techniken gibt es in RX, um die Ausgabe vorübergehend zu unterbrechen. Dies würde bedeuten, dass eingehende Ereignisse in einer internen Warteschlange aufgebaut würden und wenn sie nicht pausiert würden, würden die Ereignisse wieder herausfließen.

+0

Denken, dass, wenn die Ausgabe angehalten ist, Ereignisse in eine interne Warteschlange umleiten könnte, und wenn die outout angehalten ist, könnte es die Warteschlange auszuspülen. – Contango

+0

Sie haben keinen eigenen IObserver 'implementiert, oder? – Enigmativity

+0

Nein, alles, was ich getan habe, ist die Umwandlung des internen 'Subject ' in einen' IObserver ', so dass die Methode' .OnNext' verfügbar gemacht werden kann. – Contango

Antwort

1

Sie können die Pause/Pause mit einer Observable simulieren.

Sobald Ihr pauseObservable einen 'pausierten' Wert ausgibt, puffern Sie Ereignisse, bis pauseObservable einen 'nicht pausierten' Wert ausgibt.

Hier ist ein Beispiel, das BufferUntil implementation by Dave Sexton und Observable logic by Timothy Shields (vor einiger Zeit aus meiner Frage) verwendet

 //Input events, hot observable 
     var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(i => i.ToString()) 
      .Publish().RefCount(); 

     //Simulate pausing from Keyboard, not actually relevant within this answer 
     var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
      k => KeyPressed += k, k => KeyPressed -= k) 
      .Select(i => i.EventArgs.PressedKey) 
      .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause 
      .DistinctUntilChanged(); 

     //Let events through when not paused 
     var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused}) 
      .Where(i => !i.paused) //not paused 
      .Select(i => i.value) 
      .Subscribe(Console.WriteLine); 

     //When paused, buffer until not paused 
     var pausedEvents = pauseObservable.Where(i => i) 
      .Subscribe(_ => 
       source.BufferUntil(pauseObservable.Where(i => !i)) 
        .Select(i => String.Join(Environment.NewLine, i)) 
        .Subscribe(Console.WriteLine)); 

Raum für Verbesserungen: vielleicht die beiden Abonnements Quelle (pausedEvents und notPausedEvents) als eine Einheit verschmelzen.

2

Hier ist ein einigermaßen einfacher Rx Weg zu tun, was Sie wollen. Ich habe eine Erweiterungsmethode mit dem Namen Pausable erstellt, die eine beobachtbare Quelle und eine zweite beobachtbare Variable von boolean verwendet, die das Observable pausiert oder wieder aufnimmt.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source, 
    IObservable<bool> pauser) 
{ 
    return Observable.Create<T>(o => 
    { 
     var paused = new SerialDisposable(); 
     var subscription = Observable.Publish(source, ps => 
     { 
      var values = new ReplaySubject<T>(); 
      Func<bool, IObservable<T>> switcher = b => 
      { 
       if (b) 
       { 
        values.Dispose(); 
        values = new ReplaySubject<T>(); 
        paused.Disposable = ps.Subscribe(values); 
        return Observable.Empty<T>(); 
       } 
       else 
       { 
        return values.Concat(ps); 
       } 
      }; 

      return pauser.StartWith(false).DistinctUntilChanged() 
       .Select(p => switcher(p)) 
       .Switch(); 
     }).Subscribe(o); 
     return new CompositeDisposable(subscription, paused); 
    }); 
} 

Es kann wie folgt verwendet werden:

var xs = Observable.Generate(
    0, 
    x => x < 100, 
    x => x + 1, 
    x => x, 
    x => TimeSpan.FromSeconds(0.1)); 

var bs = new Subject<bool>(); 

var pxs = xs.Pausable(bs); 

pxs.Subscribe(x => { /* Do stuff */ }); 

Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 
Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 

Nun, das einzige, was ich konnte ganz das, was Sie von Ihrem bedeuten nicht funktioniert „eingehenden Strom von Ereignissen ist ein IObserver<T>“. Streams sind IObservable<T>. Beobachter sind keine Streams. Es hört sich so an, als ob du hier etwas nicht machst. Kannst du deine Frage hinzufügen und bitte weiter erklären?

+0

Danke für Ihre Antwort. Ich habe meine Frage aktualisiert, um den Datenfluss klarer zu machen. – Contango

+0

Wie es ist, lässt dies anscheinend die ersten Werte nicht durch, weil es den falschen Zweig durchläuft und "Werte" (das ist effektiv "Observable.Never", denke ich?) Ich habe es in die Form gehackt, indem ich "Werte" initialisiert habe, um das erste Mal auf Null zu setzen, und beide Zweige eingecheckt habe. Nicht sicher, ob es etwas Eleganteres gibt. – Benjol

+0

Weitere Warnung für zukünftige selbst. 'ReplaySubject' ist der Spawn des Teufels. Wenn Sie es nicht begrenzen (durch Puffergröße oder Zeit), wird es an alles hängen, was es jemals sieht, falls jemand anderes zum Abonnieren kommt. – Benjol

4

Hier ist meine Lösung unter Verwendung von Puffern und Fensterantrieben:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser) 
{ 
    var queue = source.Buffer(pauser.Where(toPause => !toPause), 
           _ => pauser.Where(toPause => toPause)) 
         .SelectMany(l => l.ToObservable()); 

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
         _ => pauser.Where(toPause => !toPause)) 
       .Switch() 
       .Merge(queue); 
} 

Fenster werden bei Zeichnung und jedes Mal, ‚true‘ geöffnet von Pauser Strom empfängt. Es schließt sich, wenn Pauser einen "falschen" Wert liefert.

Puffer macht, was er tun soll, puffert Werte zwischen 'false' und 'true' von pauser. Sobald der Puffer 'wahr' empfängt, gibt er IList von Werten aus, die sofort auf einmal gestreamt werden.

DotNetFiddle Link: https://dotnetfiddle.net/vGU5dJ

+0

Sie müssen wahrscheinlich ein 'pauser.Publish (ps => {...})' und 'ersetzen pauser' mit' ps' in Ihrem Code zu tun, sonst sind Sie vier Abonnements 'pauser' erstellen und je nach Quelle von 'pauser', der die Methode zum Scheitern bringen kann. – Enigmativity

+0

Ja, ich habe es gerade bestätigt. Sie erstellen mehrere Abonnements. – Enigmativity

+0

Ich habe Ihren Code auf https://dotnetfiddle.net/qWN72d umgestaltet. – Enigmativity

Verwandte Themen