0

Ich brauche eine Liste von Filtern auf einem Stream zu konfigurieren.Anpassbare Filterung auf einem Strom

Der Strom zunächst hat keine Filter, aber es hat eine Liste der aktiven Filter während der Zeit.

wird jeder Filter eine strenge Gültigkeit haben.

Ein Testfall folgt:

var scheduler = new TestScheduler(); 

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

// filters 
// A between 70ms -> 550ms 
// B between 330ms -> 400ms 
// modeled as a string observable where: 
// first word is the char to filter 
// second word are the msecs duration of the filter 

var filters = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0070.Ms(), "A 480"), 
    ReactiveTest.OnNext(0330.Ms(), "B 70") 
); 

var expected = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

Können Sie mir vorschlagen, welche die beste Rx-Lösung ist, dies zu tun?

ps: Ich bin mit dem folgenden Erweiterungsmethode

public static class TickExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 
} 
+0

Der Code nicht kompiliert. Ich denke 'Ms()' sollte eine lange zurückgeben. 'filters' sieht so aus, als sollte es ein' CreateColdObservable 'sein. –

+0

@LeeCampbell richtig, ich habe das behoben. – zpul

Antwort

1

Zunächst sollten Sie Ihre Eingaben wie folgt aussehen:

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

Auch als @LeeCampbell erwähnt, Ihre Ms Erweiterungsmethode sollte Typ zurückgeben long, nicht TimeSpan.

Hier ist die Lösung kam ich mit:

var activeFilters = filters 
    .Select(s => s.Split(' ')) 
    .Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1])))) 
    .Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1)) 
    .MergeCombineLatest(true) 
    .StartWith(new List<char>()); 

var output = activeFilters.Publish(_activeFilters => 
     input.Join(_activeFilters, 
      _ => Observable.Return(1), 
      t => _activeFilters, 
      (c, filterList) => Tuple.Create(c, filterList) 
     ) 
    ) 
    .Where(t => !t.Item2.Contains(t.Item1)) 
    .Select(t => t.Item1); 

var observer = scheduler.CreateObserver<char>(); 
output.Subscribe(observer); 
scheduler.Start(); 

ReactiveAssert.AreElementsEqual(
    expected.Messages, 
    observer.Messages); 

activeFilters ist eine beobachtbare, die derzeit unter Filter eine Liste von Zeichen aussendet. Wenn sich die aktiv gefilterte Liste ändert, gibt activeFilters eine neue Liste aus. Beachten Sie, dass, da das gleiche Zeichen mehrere Filter haben kann, die Liste nicht unbedingt eindeutig ist.

Sobald Sie zu einem bestimmten Zeitpunkt herausfinden können, welche Filter aktiv sind, können Sie diese Liste an den Eingang anschließen.

Der Code erfordert diese Erweiterung Methode, die die System.Collections.Immutable Nuget Paket verwendet:

public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted) 
{ 
    return outer 
     .SelectMany((inner, i) => inner 
      .Materialize() 
      .SelectMany(nt => nt.Kind == NotificationKind.OnNext 
       ? Observable.Return(Tuple.Create(i, nt.Value, true)) 
       : nt.Kind == NotificationKind.OnCompleted 
        ? removeCompleted 
         ? Observable.Return(Tuple.Create(i, default(T), false)) 
         : Observable.Empty<Tuple<int, T, bool>>() 
        : Observable.Throw<Tuple<int, T, bool>>(nt.Exception) 
      ) 
     ) 
     .Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1)) 
     .Select(dict => dict.Values.ToList()); 
} 

MergeCombineLatest eine beobachtbare von Observablen nimmt und gibt eine Liste der aktuellen Werte aus jeder der Kind Observablen. Wenn der Wert removeCompleted wahr ist, wird die Liste bei Abschluss einer untergeordneten Observablen um eins verkleinert. Wenn removeCompleted falsch ist, bleibt der letzte Wert für immer in der Liste und den nachfolgenden Listen.

Wenn es eine freundlichere Art ist, dies zu tun, wäre ich sehr verpflichtet.

+0

Danke für die sehr nützliche Antwort !! Ich werde es so schnell wie möglich überprüfen. – zpul

+0

Inspiriert von Ihrer Lösung endete ich mit dem folgenden. https://gist.github.com/zpul/fcc82cb0e963ab69ebf7ae8fc7d95b2f Was denken Sie darüber? – zpul

+0

Das Großartige an Rx (oder irgendeinem anderen Funktionscode) ist, dass es uns ermöglicht, Nebeneffekte/Zustandsprobleme sowie Thread-Sicherheitsprobleme zu vermeiden. 'Liste ' ist nicht Thread-sicher. Siehe https://msdn.microsoft.com/en-us/library/6sh2ey19.aspx#Anchor_10. Ihr Code könnte Probleme haben, wenn Sie versuchen, einen Filter zur gleichen Zeit hinzuzufügen/zu entfernen, sowie einige andere Funktionen, die versehentlich die Variable 'excludeFilters' manipulieren. – Shlomo

Verwandte Themen