2012-04-02 4 views
7

Sorry, wenn der Titel nicht ganz klar ist, konnte ich mir nichts besseres ...In Rx, wie die neuesten Elemente nach einer bestimmten Zeit gruppiert werden?

ich Benutzereingaben in Form eines IObservable<char> bin empfangen, und ich möchte es ein zu transformieren IObservable<char[]>, indem Sie die Zeichen jedes Mal gruppieren, wenn der Benutzer länger als 1 Sekunde aufhört zu tippen. So zum Beispiel, wenn der Eingang wie folgt:

h 
e 
l 
l 
o 
(pause) 
w 
o 
r 
l 
d 
(pause) 
! 
(pause) 

ich die Ausgabe möge beobachtbar sein:

['h', 'e', 'l', 'l', 'o'] 
['w', 'o', 'r', 'l', 'd'] 
['!'] 

Ich vermute, dass die Lösung ist recht einfach, aber ich kann nicht finden der richtige Ansatz ... Ich habe versucht, Buffer, GroupByUntil, Throttle und ein paar andere, ohne Erfolg zu verwenden.

Jede Idee wäre willkommen!


EDIT: Ich habe etwas, das fast funktioniert:

 _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1))) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

Aber ich brauche die Verzögerung jedes Mal, wenn ein neues Zeichen zurückgesetzt werden eingegeben wird ...

Antwort

7

Buffer und Throttle wäre genug, wenn Ihre Quelle heiß ist. Um es heiß zu machen, können Sie .Publish().RefCount() verwenden, um sicherzustellen, dass Sie nur mit einem Abonnement für die Quelle enden.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source, 
               TimeSpan dueTime) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    //defer dueTime checking to Throttle 
    var hot = source.Publish().RefCount(); 
    return hot.Buffer(() => hot.Throttle(dueTime)); 
} 
+0

Danke, es funktioniert super und es ist viel eleganter als meine Lösung. Tatsächlich ist meine Quelle bereits heiß (es ist ein 'Betreff ', den ich von Eingabeereignissen füttere); Ich bin mir nicht sicher, was die Auswirkungen der Verwendung von 'Publish(). RefCount()' ist ... –

+0

@ThomasLevesque Wenn Ihre Quelle bereits heiß ist, glaube ich, dass Publish/RefCount nur einige verschwendet Wrapper-Schichten sein wird. Wenn Sie dies als allgemeine Fallfunktion verwenden möchten, würde ich sie wahrscheinlich einfach zurücklassen, es sei denn, sie würden in Ihrer Anwendung ein Problem darstellen.Wenn Sie es nur einmal verwenden, ändern Sie den Parameter in 'hotSource' und hinterlassen Sie eine Notiz in den Dokumentenkommentaren, und Sie sollten Publish/RefCount sicher entfernen können. –

0

OK, ich eine Lösung gefunden:

 Func<IObservable<char>> bufferClosingSelector = 
      () => 
      _input.Timeout(TimeSpan.FromSeconds(1)) 
        .Catch(Observable.Return('\0')) 
        .Where(i => i == '\0'); 
     _input.Buffer(bufferClosingSelector) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

Grundsätzlich drückt die bufferClosingSelector etwas, wenn ein Timeout auftritt, whi ch schließt den aktuellen Puffer. Es gibt wahrscheinlich einen einfacheren und eleganteren Weg, aber es funktioniert ... Ich bin offen für bessere Vorschläge;)

0

Ich schrieb eine Erweiterung vor einiger Zeit zu tun, was Sie wollen - BufferWithInactivity.

Hier ist sie:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

Vielen Dank! Allerdings ist es nicht gerade "einfacher" als meine Lösung;) –

0

Dies sollte arbeiten. Es ist nicht annähernd so prägnant wie Ihre Lösung, da es die Logik durch eine Klasse anstelle von Erweiterungsmethoden implementiert, aber es könnte ein besserer Weg sein, dies zu tun. Kurz gesagt: Jedes Mal, wenn Sie eine char erhalten, fügen Sie sie zu einer List hinzu und starten (neu) einen Timer, der in einer Sekunde abläuft; Wenn der Timer abläuft, benachrichtigen Sie unsere Teilnehmer mit dem List als Array und setzen Sie den Status zurück, damit er für das nächste Mal bereit ist.

class Breaker : IObservable<char[]>, IObserver<char> 
    { 
     List<IObserver<char[]>> observers = new List<IObserver<char[]>>(); 
     List<char> currentChars; 
     DispatcherTimer t; 
     public Breaker(IObservable<char> source) 
     { 
      source.Subscribe(this); 
      t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) }; 
      t.Tick += TimerOver; 
      currentChars = new List<char>(); 
     } 
     public IDisposable Subscribe(IObserver<char[]> observer) 
     { 
      observers.Add(observer); 
      return null; //TODO return a useful IDisposable 
     } 
     public void OnCompleted() 
     { 
      //TODO implement completion logic 
     } 
     public void OnError(Exception e) 
     { 
      //TODO implement error logic 
     } 
     public void OnNext(char value) 
     { 
      currentChars.Add(value); 
      t.Start(); 
     } 
     void TimerOver(object sender, EventArgs e) 
     { 
      char[] chars = currentChars.ToArray(); 
      foreach (var obs in observers) 
       obs.OnNext(chars); 
      currentChars.Clear(); 
      t.Stop(); 
     } 
    } 
Verwandte Themen