2012-07-25 11 views
12

Throttle Methode überspringt Werte aus einer beobachtbaren Sequenz, wenn andere zu schnell folgen. Aber ich brauche eine Methode, um sie zu verzögern. Das heißt, ich muss eine minimale Verzögerung zwischen den Elementen festlegen, ohne eine zu überspringen.Throttle Rx.Observable ohne Überspringen von Werten

Praktisches Beispiel: Es gibt einen Webservice, der Anfragen nicht schneller als einmal pro Sekunde annehmen kann; Es gibt einen Benutzer, der Anfragen einzeln oder in Stapeln hinzufügen kann. Ohne Rx erstelle ich eine Liste und einen Timer. Wenn Benutzer Anfragen hinzufügen, füge ich sie zur Liste hinzu. Im Timer-Event überprüfe ich, ob die Liste leer ist. Ist dies nicht der Fall, sende ich eine Anfrage und entferne den entsprechenden Artikel. Mit Schlössern und all dem Zeug. Jetzt, mit Rx, kann ich Subject erstellen, Elemente hinzufügen, wenn Benutzer Anforderungen hinzufügt. Aber ich brauche einen Weg, um sicherzustellen, dass der Web-Service nicht überflutet wird, indem ich Verzögerungen anwende.

Ich bin neu in Rx, also vielleicht fehlt mir etwas offensichtlich.

Antwort

7

ist es eine ziemlich einfache Art und Weise zu tun, was Sie wollen einen EventLoopScheduler verwenden.

Ich begann mit einer Observablen, die zufällig alle 0 bis 3 Sekunden Werte produziert.

var rnd = new Random(); 

var xs = 
    Observable 
     .Generate(
      0, 
      x => x < 20, 
      x => x + 1, 
      x => x, 
      x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0)); 

Nun, diese Ausgangswerte zu machen sofort, wenn der letzte Wert war innerhalb einer Sekunde vor ich dies tat:

var ys = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return xs 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 

Diese effektiv die Quelle auf dem EventLoopScheduler beobachtet und legt sie dann schlafen für 1 Sekunde nach jedem OnNext, damit es erst nach dem Aufwachen mit dem nächsten OnNext beginnen kann.

Getestet habe ich, dass es mit diesem Code gearbeitet:

ys 
    .Timestamp() 
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0) 
    .Subscribe(x => Console.WriteLine(x)); 

Ich hoffe, das hilft.

+0

Ist 'Thread.Sleep' nicht schlecht? Ich dachte immer, einen Thread für Dinge auszusetzen, die im Wesentlichen "Timer" sind, ist eine Verschwendung von Ressourcen. –

+0

@VarvaraKalinina - Es passiert auf seinem eigenen Thread und es ist nichts anderes festgefahren. In diesem Fall ist es in Ordnung. – Enigmativity

+0

Ich habe nicht gesagt, dass es darum geht, etwas zu blockieren oder zu blockieren. Es ging um Ressourcenverschwendung. Sie machen einen Thread gewaltsam fast die ganze Zeit nichts. Es könnte andere Dinge tun, wenn es nicht blockiert wurde –

0

Wie wäre es mit einem beobachtbaren Timer aus einer blockierenden Warteschlange zu nehmen? Code unten ist nicht getestet, aber sollten Sie eine Vorstellung davon, was ich meine ...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ... 

Observable 
    .Timer(new TimeSpan(0,0,1), new EventLoopScheduler()) 
    .Do(i => myWebService.Send(workQueue.Take())); 

// Then just add items to the queue using workQueue.Add(...) 
+0

Diese Lösung wird wahrscheinlich funktionieren und ist ähnlich die „vor rx“ Lösung der Frage, aber es gibt eine Nachteil: Anfragen werden nicht sofort gesendet, auch wenn es möglich ist; nur wenn der Timer tickt. Mit einer Verzögerung von 1 Sekunde ist es nicht sehr wichtig, aber wenn die Verzögerungen zwischen den Anforderungen beispielsweise 1 Minute sind, dann ist das ein Problem. – Athari

+0

@Athari nicht wahr. Der erste Timer-Tick blockiert den Take() und führt den Moment aus, in dem ein Objekt in die Warteschlange eingereiht wird. Sie können versuchen, das Timeout auf 1 Minute einzustellen, um zu sehen, was ich meine. –

+0

Ich habe es ausprobiert und es gibt tatsächlich ein anderes Problem: manchmal werden zwei Gegenstände ohne Verzögerung nacheinander bearbeitet. Eine Vermutung tritt auf, weil während des Wartens auf Take ein weiteres Timer-Ereignis in die Warteschlange gestellt wird, ohne darauf zu warten, dass das vorherige Ereignis die Verarbeitung beendet. Mein Code: http://pastebin.com/RRZ0ffBB – Athari

0

Verwenden Sie einen Puffer.

http://msdn.microsoft.com/en-us/library/hh212130(v=vs.103).aspx

Wie der Name andeutet, puffert es Elemente im beobachtbaren Strom von Ereignissen, ohne irgendein von ihnen „dropping“.

-G

+2

'Buffer' erzeugt eine Liste bei jeder Iteration, aber ich brauche ein einzelnes Element. Im Beispiel der Frage unterstützt der Web-Service keine Batch-Anfragen. – Athari

2

Wie wäre es eine einfache Erweiterung Methode:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 

Verbrauch:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1)); 
0
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any()) 
.Subscribe(buffer => 
{ 
    foreach(var item in buffer) Console.WriteLine(item) 
}); 
+0

Hier gibt es das gleiche Problem: der Aufwand für die Rückgabe einer Sequenz, wenn Sie genau 1 Element oder nichts benötigen (warten und dann das nächste Element usw.) – abatishchev

3

Ich möchte einen Ansatz vorschlagen bei der Verwendung von Observable.Zip:

// Incoming requests 
var requests = new[] {1, 2, 3, 4, 5}.ToObservable(); 

// defines the frequency of the incoming requests 
// This is the way to emulate flood of incoming requests. 
// Which, by the way, uses the same approach that will be used in the solution 
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;}); 
incomingRequests.Subscribe((number) => 
{ 
    Console.WriteLine($"Request received: {number}"); 
}); 

// This the minimum interval at which we want to process the incoming requests 
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1)); 

// Zipping incoming requests with the interval 
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;}); 

requestsToProcess.Subscribe((number) => 
{ 
    Console.WriteLine($"Request processed: {number}"); 
}); 
+0

Ich war auf der Suche nach einer Lösung für RXJs und das hat auch dort funktioniert . Danke vielmals! :) – Sammi

0

Ich spielte mit dieser und spielte.Zip (wie bereits erwähnt) die einfachste Methode, um:

var stream = "ThisFastObservable".ToObservable(); 
var slowStream = 
    stream.Zip(
     Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
     (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y) 

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}")); 

Ausgang:

T arrived after 00:00:01.0393840 
h arrived after 00:00:00.9787150 
i arrived after 00:00:01.0080400 
s arrived after 00:00:00.9963000 
F arrived after 00:00:01.0002530 
a arrived after 00:00:01.0003770 
s arrived after 00:00:00.9963710 
t arrived after 00:00:01.0026450 
O arrived after 00:00:00.9995360 
b arrived after 00:00:01.0014620 
s arrived after 00:00:00.9993100 
e arrived after 00:00:00.9972710 
r arrived after 00:00:01.0001240 
v arrived after 00:00:01.0016600 
a arrived after 00:00:00.9981140 
b arrived after 00:00:01.0033980 
l arrived after 00:00:00.9992570 
e arrived after 00:00:01.0003520 
Verwandte Themen