2013-10-05 6 views
5

Ich beginne die Entwicklung mit Reactive-Erweiterungen (Version 2.1, nur für den Fall) und für meine Beispielanwendung brauche ich eine Sequenz von Int-Werten mit einem Intervall geschoben , dh alle 1 Sekunde.Wie relative Verzögerung in Observable-Sequenz mit Rx (Reaktive Erweiterungen) eingefügt werden

Ich weiß, dass ich eine Sequenz mit Observable.Range<int>(0,10) erstellen kann, aber ich kann nicht herausfinden, wie Sie die relative Zeit zwischen Pushs einstellen. Ich habe versucht Delay() aber verschiebt die Sequenz nur einmal am Anfang.

Ich fand dann Observable.Generate() Methode, die für diese Aufgabe in der nächsten Art und Weise angepasst werden kann:

var delayed = Observable. 
       Generate(0, i => i <= 10, i => i + 1, i => i, 
          i => TimeSpan.FromSeconds(1)); 

Aber das scheint nur für einfache ‚für-jeden-like‘ definierten Sequenzen zu arbeiten. Also, im Allgemeinen ist meine Frage, ob wir eine Quellsequenz erhalten und sie mit einem Proxy umhüllen können, der Nachrichten aus der Quelle zieht und sie mit zeitlicher Verzögerung weitergibt?

S--d1--d2--d3--d4--d5-| 
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-| 

P.S. Wenn dieser Ansatz dem Konzept von ReactiveExtensions widerspricht, beachten Sie dies bitte ebenfalls. Ich will es nicht "auf alle Fälle" machen und sie bekommen in Zukunft noch ein paar Designprobleme.

P.P.S General Idea ist sicher zu stellen, dass Ausgangssequenz zwischen den Ereignissen trotz ein bestimmtes Intervall hat, wenn die Eingangssequenz endlich oder unendlich ist und wie oft es schiebt Ereignisse.

Antwort

7

Observable.Interval ist, was Sie sehen möchten. Es wird eine 0 basierend long-Wert von 1, die Sie z.B .:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x)); 

Sie können dann einen Vorsprung (Select) verwenden angeben jedes Zeitintervall erhöht wird erzeugen, diesen Wert Offset/nach Bedarf ändern.

Mit dem Zip-Operator können Sie auch einen Stream mit einem anderen synchronisieren - Sie können sich das auch ansehen. Zip paart Ereignisse aus zwei Streams zusammen, sodass es im Tempo des derzeit langsamsten Streams ausgegeben wird. Zip ist auch sehr flexibel, es kann eine beliebige Anzahl von Streams zippen oder sogar ein IObservable zu einem IEnumerable zippen. Hier ist ein Beispiel dafür:

var pets = new List<string> { "Dog", "Cat", "Elephant" }; 
var pace = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Zip(pets, (n, p) => p)  
    .Subscribe(x => Console.WriteLine(x),() => Console.WriteLine("Done")); 

Dies schreibt Haustiere in einem Intervall von 1 Sekunde.

In Anbetracht des P.P.S. oben hinzugefügt, werde ich eine andere Antwort geben - ich werde das als Referenz verlassen, da es sowieso eine nützliche Technik ist.

+1

Danke für die Antwort. Der Hack mit Zip() funktioniert fast gut. Ein Fehler ist hier, wenn die "Eingabe" -Sequenz unendlich und zeitlich nicht vorhersagbar ist (d. H. Das Ereignis von Tastenklicks). Wenn der Button nicht angeklickt wurde - sagen wir 10 Sekunden - gab es keine Ereignisse in "input", aber die "timer" Sequenz erzeugte 10 Events. Dann, wenn ich den Knopf schnell anklicke, werden alle neuen Ereignisse von "input" paarweise mit vorhandenen Ereignissen von 'timer' gezippt, folglich wird 'result'-Sequenz auch mit der Geschwindigkeit des Drückens von' input' gezogen, was nicht das ist, was ich wollen. – Antonio

+0

Ich würde hinzufügen wie: _on neues Ereignis 't_i' in' timer', überprüfe, ob es ein Ereignis in 'input' gibt und delete' t_i' falls es nicht_ ist. – Antonio

+0

Das bisschen über einen Knopfdruck ist alles neu für mich :) Es war nicht in Ihrer Frage. Vielleicht können Sie das Problem nur in Bezug auf die tatsächlichen Daten und die gewünschte Benutzererfahrung neu formulieren. –

2

Also, um zu klären, möchten Sie den Ausgang, um die Eingabe mit einer Geschwindigkeit nicht schneller als das Intervall, aber sonst so schnell wie möglich zu drücken.

In diesem Fall versuchen Sie dies. Die input variable Konstruktion ist ein hacky Weg, um eine kurze sporadische Sequenz zu erstellen, die manchmal schneller und manchmal langsamer ist als ein Tempo von 2 Sekunden. Beachten Sie, dass die Ausgabe der Stoppuhr die kleinen Ungenauigkeiten im Timer-Mechanismus anzeigt, den Rx verwendet.

var input = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4); 
input = input.Concat(Observable.Interval(TimeSpan.FromSeconds(5)).Take(2)); 

var interval = TimeSpan.FromSeconds(2); 

var paced = input.Select(i => Observable.Empty<long>() 
             .Delay(interval) 
             .StartWith(i)).Concat(); 

var stopwatch = new Stopwatch(); 
stopwatch.Start(); 
paced.Subscribe(
    x => Console.WriteLine(x + " " + stopwatch.ElapsedMilliseconds), 
    () => Console.WriteLine("Done")); 

Diese Probe wirkt, indem es jedem Tick von dem Eingang in eine Sequenz, die das Projizieren tick als ein einzelnes Ereignis zu Beginn, aber nicht onComplete nicht für das gewünschte Intervall. Der resultierende Strom von Strömen wird dann verkettet. Dieser Ansatz stellt sicher, dass neue Ticks sofort ausgegeben werden, wenn die Ausgabe gerade "gespült" wird, aber ansonsten aufeinander folgend gepuffert wird.

Sie können dies in eine Erweiterungsmethode einfügen, um sie generisch zu machen.

0

Hier ist der einfachste Ansatz zu tun, was Sie wollen:

var delayed = 
    source.Do(x => Thread.Sleep(1000)); 

Es fügt eine zweite Verzögerung, aber es so vor dem ersten Element macht. Sie könnten sicherlich einige Logik einpacken, um nicht eine Verzögerung am Anfang zu setzen. Das wäre nicht zu schwer.


Hier ist eine Alternative, die die Verzögerung auf einen brandneuen Trend einplant.

var delayed = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return source 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 
+0

Ich muss sagen, dass ich aus einigen Gründen kein Fan dieses Ansatzes bin. Es ist kein Unit Test gut - im Gegensatz zu Delay haben Sie keine Chance einen Scheduler auf die Wartezeit zu parametrieren. Es blockiert auch einen Thread unnötig.Es verwendet auch Do, das pro Abonnent ausgeführt wird, so dass Sie die Möglichkeit für unerwartete Nebenwirkungen öffnen (stellen Sie sich einen Downstream-Operator vor, der eine doppelte Subskription durchführt). Schließlich, und als Ergebnis der oben genannten, ist es nicht idiomatische Rx –

+0

@JamesWorld - Ja, ich stimme zu, dass meine erste Option nicht die beste ist. Die zweite ist etwas besser, da sie keine vorhandenen Threads blockiert. – Enigmativity

0

Vielleicht, was Sie suchen ist die Buffer Erweiterungsmethode. Es Signatur wird wie folgt definiert:

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source, 
    TimeSpan timeSpan) 

Es wird die Quellsequenz in einer Art und Weise transformiert, dass die Werte so oft wie timeSpan im Batch produziert werden.

0

Ich weiß das eine alte Frage, aber ich denke, ich habe die richtige Antwort.

Das Zippen eines Observable.Timer erzeugt "Ticks", auch wenn das Quell-Observable nichts produziert. Dies bedeutet, dass sobald die Quelle einen anderen Gegenstand produziert, alle Zecken verwendet werden, die bereits produziert wurden und noch nicht verbraucht sind. Dies führt zu einer Observablen, die eine Verzögerung zwischen den Artikeln hinzufügt, wenn der Produzent Artikel mit einer konstanten Rate produziert, die jedoch Teile hervorbringen, wenn der Produzent manchmal länger braucht, um einen Artikel zu produzieren.

Um dies zu umgehen, müssen Sie einen Timer generieren, der nur ein Element zwischen jedem Objekt erzeugt, das von Ihrer Observablen erzeugt wird. Sie können dies mit Observable.Switch wie folgt tun:

var subject = new Subject<Unit>(); 

     var producer = subject.SelectMany(
            _ => 
            { 
             return new[] 
             { 
              Observable.Return(true), 
              Observable.Timer(TimeSpan.FromSeconds(2)) 
                .Select(q => false) 
             }; 
            }) 
           .Switch();    
Verwandte Themen