2017-08-02 1 views
4

sagen, dass ich eine Art T haben:Sortieren beobachtbare von vordefinierten Reihenfolge in Reactive Extensions

class T { 
    public int identifier; //Arbitrary but unique for each character (Guids in real-life) 
    public char character; //In real life not a char, but I chose char here for easy demo purposes 
} 

Und ich habe eine vordefinierte geordnete Folge von Kennungen:

int[] identifierSequence = new int[]{ 
    9, 3, 4, 4, 7 
}; 

ich jetzt brauche eine IObservable<T> Ordnung, die erzeugt die folgende Sequenz von Objekten:

{identifier: 3, character 'e'}, 
{identifier: 9, character 'h'}, 
{identifier: 4, character 'l'}, 
{identifier: 4, character 'l'}, 
{identifier: 7, character 'o'} 

So dass Das resultierende IObservable erzeugt hello. Ich möchte ToArray nicht verwenden, da ich Objekte sofort erhalten möchte, sobald sie ankommen und nicht warten bis alles beobachtet wird. Genauer gesagt, würde Ich mag sie so erhalten:

Input: e h l l o 
Output: he l l o 

Was die richtige reaktive Weise sein würde, dies zu tun? Das Beste, was ich tun konnte, ist dies:

Dictionary<int, T> buffer = new Dictionary<int, T>(); 
int curIndex = 0; 

inputObserable.SelectMany(item => 
{ 
    buffer[item.identifier] = item; 

    IEnumerable<ReportTemplate> GetReadyElements() 
    { 
     while (true) 
     { 
      int nextItemIdentifier = identifierSequence[curIndex]; 
      T nextItem; 
      if (buffer.TryGetValue(nextItemIdentifier, out nextItem)) 
      { 
       buffer.Remove(nextItem.identifier); 
       curIndex++; 
       yield return nextItem; 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 

    return GetReadyElements(); 
}); 

EDIT:

Schlomo einige sehr wichtigen Probleme mit meinem Code ausgelöst, weshalb ich seine Antwort als richtig gekennzeichnet. Ich habe einige Änderungen an seinen zu kodieren für sie verwendbar sein:

  • Generisches Kennung und Objekttyp
  • Iteration statt Rekursion Potential Stackoverflow
  • Konvertieren Sie den anonymen Typen zu einem echten auf sehr große Observablen zu verhindern Klasse für die Lesbarkeit
  • Wo immer möglich, Lookup einen Wert in einem Wörterbuch nur einmal und speichern als Variable anstelle des Betrachtens mehrere Male bis
  • Typ fix

Das gibt mir den folgenden Code:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc) 
    { 
     var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>()); 
     return source.Scan(initialState, (oldState, item) => 
      { 
       //Function to be called upon receiving new item 
       //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item 
       //Otherwise, if nothing is available yet, just return the input state 
       OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state) 
       { 
        int index = state.Index; 
        ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer; 
        IList<T> output = new List<T>(); 

        while (index < identifierSequence.Count) 
        { 
         TId key = identifierSequence[index]; 
         ImmutableList<T> nextValues; 
         if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty) 
         { 
          //No values available yet 
          break; 
         } 

         T toOutput = nextValues[nextValues.Count - 1]; 
         output.Add(toOutput); 

         buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1)); 
         index++; 
        } 

        return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output); 
       } 

       //Before calling the recursive function, add the new item to the buffer 
       TId itemIdentifier = identifierFunc(item); 

       ImmutableList<T> valuesList; 
       if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList)) 
       { 
        valuesList = ImmutableList<T>.Empty; 
       } 
       var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item)); 

       return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>())); 
      }) 
      // Use Dematerialize/Notifications to detect and emit end of stream. 
      .SelectMany(output => 
      { 
       var notifications = output.Output 
        .Select(item => Notification.CreateOnNext(item)) 
        .ToList(); 

       if (output.Index == identifierSequence.Count) 
       { 
        notifications.Add(Notification.CreateOnCompleted<T>()); 
       } 

       return notifications; 
      }) 
      .Dematerialize(); 
    } 

    class OrderByIdentifierSequenceState<T, TId> 
    { 
     //Index shows what T we're waiting on 
     public int Index { get; } 
     //Buffer holds T that have arrived that we aren't ready yet for 
     public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; } 
     //Output holds T that can be safely emitted. 
     public IEnumerable<T> Output { get; } 

     public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output) 
     { 
      this.Index = index; 
      this.Buffer = buffer; 
      this.Output = output; 
     } 
    } 

jedoch dieser Code noch ein paar Probleme hat:

  • Constant Kopieren des Staates (vor allem die ImmutableDictionary), die sehr teuer sein kann . Mögliche Lösung: Pflegen Sie einen separaten Status pro Beobachter statt pro empfangenem Element.
  • Wenn eines oder mehrere der Elemente in nicht in der Quelle beobachtbar sind, wird ein Problem angezeigt. Dies blockiert zur Zeit das bestellte Observable und es wird nie enden. Mögliche Lösungen: Timeout, Ausnahmefehler, wenn die Quelle beobachtbar ist, Rückgabe aller verfügbaren Elemente, wenn die Quelle beobachtbar ist, ...
  • Wenn die Quelle Observable mehr Elemente als enthält, erhalten wir ein Speicherleck. Elemente, die in der Quelle beobachtbar sind, aber nicht in identifierSequence, werden derzeit zum Wörterbuch hinzugefügt, aber nicht gelöscht, bevor die Quellbeobachtungsfunktion abgeschlossen ist. Dies ist ein potenzielles Speicherleck. Mögliche Lösungen: Überprüfen Sie, ob sich das Element in identifierSequence befindet, bevor Sie es zum Wörterbuch hinzufügen, Code umgehen und sofort das Element ausgeben, ...

Meine Lösung:

/// <summary> 
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence. 
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed. 
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable. 
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored. 
    /// </summary> 
    /// <typeparam name="T">The type that is produced by the source observable</typeparam> 
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam> 
    /// <param name="source">The source observable</param> 
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param> 
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param> 
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns> 
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException(nameof(source)); 
     } 
     if (identifierSequence == null) 
     { 
      throw new ArgumentNullException(nameof(identifierSequence)); 
     } 
     if (identifierFunc == null) 
     { 
      throw new ArgumentNullException(nameof(identifierFunc)); 
     } 

     if (identifierSequence.Count == 0) 
     { 
      return Observable.Empty<T>(); 
     } 

     HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence); 

     return Observable.Create<T>(observer => 
     { 
      //current index of pending item in identifierSequence 
      int index = 0; 
      //buffer of items we have received but are not ready for yet 
      Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>(); 

      return source.Select(
        item => 
        { 
         //Function to be called upon receiving new item 
         //We search for the current pending item in the buffer. If it is available, we yield return it and repeat. 
         //If it is not available yet, stop. 
         IEnumerable<T> GetAvailableOutput() 
         { 
          while (index < identifierSequence.Count) 
          { 
           TId key = identifierSequence[index]; 
           List<T> nextValues; 
           if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0) 
           { 
            //No values available yet 
            break; 
           } 

           yield return nextValues[nextValues.Count - 1]; 

           nextValues.RemoveAt(nextValues.Count - 1); 
           index++; 
          } 
         } 

         //Get the identifier for this item 
         TId itemIdentifier = identifierFunc(item); 

         //If this item is not in identifiersInSequence, we ignore it. 
         if (!identifiersInSequence.Contains(itemIdentifier)) 
         { 
          return Enumerable.Empty<T>(); 
         } 

         //Add the new item to the buffer 
         List<T> valuesList; 
         if (!buffer.TryGetValue(itemIdentifier, out valuesList)) 
         { 
          valuesList = new List<T>(); 
          buffer[itemIdentifier] = valuesList; 
         } 
         valuesList.Add(item); 

         //Return all available items 
         return GetAvailableOutput(); 
        }) 
       .Subscribe(output => 
       { 
        foreach (T cur in output) 
        { 
         observer.OnNext(cur); 
        } 

        if (index == identifierSequence.Count) 
        { 
         observer.OnCompleted(); 
        } 
       },(ex) => 
       { 
        observer.OnError(ex); 
       },() => 
       { 
        //When source observable is completed, return the remaining available items 
        while (index < identifierSequence.Count) 
        { 
         TId key = identifierSequence[index]; 
         List<T> nextValues; 
         if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0) 
         { 
          //No values available 
          index++; 
          continue; 
         } 

         observer.OnNext(nextValues[nextValues.Count - 1]); 

         nextValues.RemoveAt(nextValues.Count - 1); 
         index++; 
        } 

        //Mark observable as completed 
        observer.OnCompleted(); 
       }); 
     }); 
    } 

Antwort

4

Bitte beachten Sie, dass Ihre Implementierung ein paar Probleme hat:

  1. Wenn die beiden ‚l's vor ihrer Zeit zu kommen, wird eine, dann geschluckt hält die ganze Sequenz. Ihr Wörterbuch sollte einer Sammlung zugeordnet sein, nicht einem einzelnen Element.
  2. Es gibt keine OnCompleted Nachricht.
  3. Mehrere Teilnehmer können den Zustand vermasseln. Versuchen Sie, diese (wo GetPatternMatchOriginal ist Ihr Code):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 }); 

stateMachine.Take(3).Dump(); //Linqpad 
stateMachine.Take(3).Dump(); //Linqpad 

Der erste ouptut ist h e l der zweite Ausgang l o ist. Sie sollten beide h e l ausgeben.

Diese Implementierung behebt diese Probleme und ist auch nebenwirkungsfreie unveränderlichen Datenstrukturen:

public static class X 
{ 
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence) 
    { 
     //State is held in an anonymous type: 
     // Index shows what character we're waiting on, 
     // Buffer holds characters that have arrived that we aren't ready yet for 
     // Output holds characters that can be safely emitted. 
     return source 
      .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() }, 
      (state, item) => 
      { 
       //Function to be called recursively upon receiving new item 
       //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item 
       //Otherwise just return the inputs 
       (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results) 
       { 
        if (index == identifierSequence.Length) 
         return (index, buffer, results); 

        var key = identifierSequence[index]; 
        if (buffer.ContainsKey(key) && buffer[key].Any()) 
        { 
         var toOuptut = buffer[key][buffer[key].Count - 1]; 
         return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut })); 
        } 
        else 
         return (index, buffer, results); 
       } 

       //Before calling the recursive function, add the new item to the buffer 
       var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier) 
        ? state.Buffer 
        : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty); 

       var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item)); 

       var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>()); 
       return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output }; 
      }) 
      // Use Dematerialize/Notifications to detect and emit end of stream. 
      .SelectMany(output => 
      { 
       var notifications = output.Output 
        .Select(item => Notification.CreateOnNext(item)) 
        .ToList(); 
       if (output.Index == identifierSequence.Length) 
        notifications.Add(Notification.CreateOnCompleted<T>()); 
       return notifications; 
      }) 
      .Dematerialize(); 
    } 
} 

dann können Sie es wie so nennen:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 }); 
stateMachine.Dump(); //LinqPad 

src.OnNext(new T { Identifier = 4, Character = 'l' }); 
src.OnNext(new T { Identifier = 4, Character = 'l' }); 
src.OnNext(new T { Identifier = 7, Character = 'o' }); 
src.OnNext(new T { Identifier = 3, Character = 'e' }); 
src.OnNext(new T { Identifier = 9, Character = 'h' }); 
+0

Danke für die Antwort, und vor allem Probleme, die Sie mit meinem Code gefunden haben. Ich mag die Art, wie Sie diese Probleme gelöst haben, aber ich kann mir den Leistungsaufwand nicht leisten, das ImmutableDictionary ständig zu kopieren. Ich habe meinen ursprünglichen Beitrag mit einer modifizierten Version Ihres Codes aktualisiert, die den Status pro Beobachter anstatt pro empfangenem Element beibehält. – Wouter

1

Nizza Frage :-)

die mehr identischen Schlüssel gegeben, es sieht aus wie Musterabgleich in einer beliebigen Reihenfolge zu mir. Hier ist, was ich habe:

Bearbeiten: geändert, um erwartete Elemente in einem Wörterbuch nachschlagen.

public static class MyExtensions 
{ 
    public static IObservable<TSource> MatchByKeys<TSource, TKey>(this IObservable<TSource> source, IEnumerable<TKey> keys, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer = null) 
    { 
     if (source == null) throw new ArgumentNullException(nameof(source)); 
     if (keys == null) throw new ArgumentNullException(nameof(keys)); 
     if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); 
     if (keyComparer == null) keyComparer = EqualityComparer<TKey>.Default; 

     return Observable.Create<TSource>(observer => 
     { 
      var pattern = new LinkedList<SingleAssignment<TSource>>(); 
      var matchesByKey = new Dictionary<TKey, LinkedList<SingleAssignment<TSource>>>(keyComparer); 
      foreach (var key in keys) 
      { 
       var match = new SingleAssignment<TSource>(); 
       pattern.AddLast(match); 
       LinkedList<SingleAssignment<TSource>> matches; 
       if (!matchesByKey.TryGetValue(key, out matches)) 
       { 
        matches = new LinkedList<SingleAssignment<TSource>>(); 
        matchesByKey.Add(key, matches); 
       } 
       matches.AddLast(match); 
      } 

      if (pattern.First == null) 
      { 
       observer.OnCompleted(); 
       return Disposable.Empty; 
      } 

      var sourceSubscription = new SingleAssignmentDisposable(); 
      Action dispose =() => 
      { 
       sourceSubscription.Dispose(); 
       pattern.Clear(); 
       matchesByKey.Clear(); 
      }; 

      sourceSubscription.Disposable = source.Subscribe(
       value => 
       { 
        try 
        { 
         var key = keySelector(value); 
         LinkedList<SingleAssignment<TSource>> matches; 
         if (!matchesByKey.TryGetValue(key, out matches)) return; 
         matches.First.Value.Value = value; 
         matches.RemoveFirst(); 
         if (matches.First == null) matchesByKey.Remove(key); 

         while (pattern.First != null && pattern.First.Value.HasValue) 
         { 
          var match = pattern.First.Value; 
          pattern.RemoveFirst(); 
          observer.OnNext(match.Value); 
         } 
         if (pattern.First != null) return; 
         dispose(); 
         observer.OnCompleted(); 
        } 
        catch (Exception ex) 
        { 
         dispose(); 
         observer.OnError(ex); 
        } 
       }, 
       error => 
       { 
        dispose(); 
        observer.OnError(error); 
       }, 
       () => 
       { 
        dispose(); 
        observer.OnCompleted(); 
       }); 
      return Disposable.Create(dispose); 
     }); 
    } 

    private sealed class SingleAssignment<T> 
    { 
     public bool HasValue { get; private set; } 

     private T _value; 
     public T Value 
     { 
      get 
      { 
       if (!HasValue) throw new InvalidOperationException("No value has been set."); 
       return _value; 
      } 
      set 
      { 
       if (HasValue) throw new InvalidOperationException("Value has alredy been set."); 
       HasValue = true; 
       _value = value; 
      } 
     } 
    } 
} 

Prüfregeln:

var src = new Subject<T>(); 
var ordered = src.MatchByKeys(new[] { 9, 3, 4, 4, 7 }, t => t.Identifier); 
var result = new List<T>(); 
using (ordered.Subscribe(result.Add)) 
{ 
    src.OnNext(new T { Identifier = 3, Character = 'e' }); 
    src.OnNext(new T { Identifier = 9, Character = 'h' }); 
    src.OnNext(new T { Identifier = 4, Character = 'l' }); 
    src.OnNext(new T { Identifier = 4, Character = 'l' }); 
    src.OnNext(new T { Identifier = 7, Character = 'o' }); 
    src.OnCompleted(); 
} 
Console.WriteLine(new string(result.Select(t => t.Character).ToArray())); 
+0

Danke für die Antwort! Ich mag, wie ordentlich und gut geschrieben Ihr Code ist :) Allerdings, während ich denke, Ihre Antwort könnte weniger Speicher verbrauchen Ich denke, es hat Worst-Case-O (N^2) Leistung wegen der O (N) Look-up in der verknüpfte Liste gegen O (N) in meiner Antwort (O (1) Nachschlagen in den Hashmaps) – Wouter

+0

Völlig einverstanden, liebe Hashing. Die perfekte Lösung würde mehrere Artikel mit demselben Schlüssel/Identifikator verfolgen. Vielleicht morgen. Habe es trotzdem zum Spaß gemacht. – tinudu

+0

Geändert, um es zu machen O (N). Jetzt glücklich. Du? – tinudu

0

Angenommen, Sie haben dies:

IObservable<T> source = new [] 
{ 
    new T() { identifier = 3, character = 'e' }, 
    new T() { identifier = 9, character = 'h'}, 
    new T() { identifier = 4, character = 'l'}, 
    new T() { identifier = 4, character = 'l'}, 
    new T() { identifier = 7, character = 'o'} 
}.ToObservable(); 

int[] identifierSequence = new int[] 
{ 
    9, 3, 4, 4, 7 
}; 

... dann funktioniert das:

IObservable<T> query = 
    source 
     .Scan(new { index = 0, pendings = new List<T>(), outputs = new List<T>() }, (a, t) => 
     { 
      var i = a.index; 
      var o = new List<T>(); 
      a.pendings.Add(t); 
      var r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault(); 
      while (r != null) 
      { 
       o.Add(r); 
       a.pendings.Remove(r); 
       i++; 
       r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault(); 
      } 
      return new { index = i, a.pendings, outputs = o }; 
     }) 
     .SelectMany(x => x.outputs); 
Verwandte Themen