2017-10-28 2 views
0

Ich stieß auf ein Gegendruckproblem mit RX.net, für das ich keine Lösung finden kann. Ich habe einen beobachtbaren Echtzeit-Stream von Log-Nachrichten.DropQueue-Mechanismus für RX.net

var logObservable = /* Observable stream of log messages */ 

Was ich über eine TCP-Schnittstelle verfügbar machen möchten, die die Echtzeit-Log-Meldungen aus dem logObservable serialisiert, bevor sie über das Netzwerk gesendet werden. Also habe ich folgendes tun:

foreach (var message in logObservable.ToEnumerable()) 
{ 
    // 1. Serialize message 
    // 2. Send it over the wire. 
} 

Das Problem mit den .ToEnumerable() entsteht, wenn ein Gegendruck Szenario geschieht z.B. wenn der Client am anderen Ende den Stream pausiert. Das Problem ist, dass .ToEnumerable() die Elemente zwischenspeichert, was zu einer großen Speicherauslastung führt. Ich bin auf der Suche nach einem Mechanismus etwas wie eine DropQueue, die nur puffert, sagen wir, die letzten 10 Nachrichten z.

var observableStream = logObservable.DropQueue(10).ToEnumerable(); 

Ist dies der richtige Weg, um Art und Weise, dieses Problem zu lösen? Und wissen Sie, wie man einen solchen Mechanismus implementiert, um mögliche Probleme mit Gegendruck zu vermeiden?

+0

'.take (10) .toenumerable()' funktionieren würde wäre es nicht? –

+0

Ich möchte einen kontinuierlichen Strom von Protokollnachrichten über die Leitung. Wenn ich so vorgehe, wie du es vorschlägst, wird es nicht nur 10 Log-Nachrichten benötigen und dann den beobachtbaren Stream vervollständigen? Das Problem, das ich zu lösen versuche, ist, wenn die Clients zu langsam sind, um die Protokollnachrichten abzurufen oder den Stream zu pausieren, sollten sie z.B. 10 Artikel anstelle einer unbegrenzten Anzahl von Artikeln. – SOK

+0

Was ist mit '.Throttle (...)' oder '.Sample (..)'? – Enigmativity

Antwort

0

Meine DropQueue Implementierung:

public static IEnumerable<TSource> ToDropQueue<TSource>(
     this IObservable<TSource> source, 
     int queueSize, 
     Action backPressureNotification = null, 
     CancellationToken token = default(CancellationToken)) 
    { 
     var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize); 
     var isBackPressureNotified = false; 

     var subscription = source.Subscribe(
      item => 
      { 
       var isBackPressure = queue.Count == queue.BoundedCapacity; 

       if (isBackPressure) 
       { 
        queue.Take(); // Dequeue an item to make space for the next one 

        // Fire back-pressure notification if defined 
        if (!isBackPressureNotified && backPressureNotification != null) 
        { 
         backPressureNotification(); 
         isBackPressureNotified = true; 
        } 
       } 
       else 
       { 
        isBackPressureNotified = false; 
       } 

       queue.Add(item); 
      }, 
      exception => queue.CompleteAdding(), 
      () => queue.CompleteAdding()); 

     token.Register(() => { subscription.Dispose(); }); 

     using (new CompositeDisposable(subscription, queue)) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
      { 
       yield return item; 
      } 
     } 
    } 
Verwandte Themen