2015-08-10 13 views
5

Ich habe zwei Observables, eine IObservable<AlertData> und die andere IObservable<SoundRequestData>. AlertData enthält eine Eigenschaft Id, die weiß, welche SoundRequestData dazu gehört. SoundRequestData kennt nur sich selbst und hat eine Id Eigenschaft, die mit der in AlertData übereinstimmen kann.Kombinieren Observablen bedingt

Ich möchte diese beiden Datentypen zu einem neuen Typ AlertDataViewModel kombinieren. Ich kann jedoch nicht sicher sein, dass die Reihenfolge der Daten, die in beiden Observablen ankommen, gleich ist. Die Reihenfolge in der Ausgabe ist mir momentan egal.

Was ich will, ist AlertData mit SoundRequestData übereinstimmen.

Die Art, wie ich es jetzt mache, die funktioniert, aber langsam ist, ist zu warten, bis eine der Observablen fertig ist, alle Daten in eine ObservableCollection holen. Danach beginne ich das andere Observable und baue die Id's zusammen.

Gibt es einen besseren Weg, dies zu tun? Ich denke, das wie das folgende Marmor Diagramm ausgedrückt werden könnte:

Imgur

So a.id=1 Spiele bis zum 3.id=1, b.id=2 Spiele bis zum 4.id=2 und so weiter.

+0

Warum nicht beide Observablen getan warten, bis? – SaphuA

+0

Da viele Daten vorhanden sind, dauert es eine Weile, bis alles abgerufen ist. Daher möchte ich in der Lage sein, die Daten dem Benutzer so schnell wie möglich zu präsentieren. – Cheesebaron

Antwort

2

Zuerst stellen wir eine kleine Erweiterungsmethode für IObserver<T> vor.

public static IObserver<T> Safe<T>(this IObserver<T> observer) 
{ 
    var done = false; 
    return Observer.Create<TResult>(
     value => 
     { 
      if (!done) 
      { 
       observer.OnNext(value); 
      } 
     }, 
     error => 
     { 
      if (!done) 
      { 
       done = true; 
       observer.OnError(error); 
      } 
     }, 
     () => 
     { 
      if (!done) 
      { 
       done = true; 
       observer.OnCompleted(); 
      } 
     }); 
} 

Dies stellt sicher, nur dass der Betrachter in dem Muster genannt wird OnNext*(OnError|OnCompleted) und dass Verstöße gegen, die einfach ignoriert.

Wir können nun den Operator, den Sie beschrieben haben, implementieren, indem Sie Werte aus beiden Sequenzen mit dem Schlüssel puffern und nur dann ausgeben, wenn wir eine Schlüsselübereinstimmung zwischen den beiden Sequenzen haben.

public static IObservable<TResult> Join<T1, T2, TKey, TResult>(
    IObservable<T1> source1, 
    IObservable<T2> source2, 
    Func<T1, TKey> key1, 
    Func<T2, TKey> key2, 
    Func<T1, T2, TResult> selector) 
{ 
    return Observable.Create<TResult>(observer => 
    { 
     var dict1 = new Dictionary<TKey, T1>(); 
     var dict2 = new Dictionary<TKey, T2>(); 
     var gate = new object(); 
     var safeObserver = observer.Safe(); 
     Action<TKey> emit = k => 
     { 
      T1 value1; 
      T2 value2; 
      if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2)) 
      { 
       var result = selector(value1, value2); 
       safeObserver.OnValue(result); 
       dict1.Remove(k); 
       dict2.Remove(k); 
      } 
     }; 
     return new CompositeDisposable(
      source1.Synchronize(gate).Subscribe(
       value1 => 
       { 
        var k = key1(value1); 
        dict1[k] = value1; 
        emit(k); 
       }, 
       safeObserver.OnError, 
       safeObserver.OnCompleted), 
      source2.Synchronize(gate).Subscribe(
       value2 => 
       { 
        var k = key2(value2); 
        dict2[k] = value2; 
        emit(k); 
       }, 
       safeObserver.OnError, 
       safeObserver.OnCompleted)); 
    }); 
} 

Beispiel:

IObservable<AlertData> alertDatas = ...; 
IObservable<SoundRequestData> = soundRequestDatas = ...; 
IObservable<AlertDataViewModel> alertDataViewModels = Join(
    alertDatas, 
    soundRequestDatas, 
    alertData => alertData.Id, 
    soundRequestData => soundRequestData.Id, 
    (alertData, soundRequestData) => new AlertDataViewModel 
    { 
     AlertData = alertData, 
     SoundRequestData = soundRequestData 
    }); 
1

Dies ist nicht die schönste, aber es wird funktionieren.

Es wird diese Klasse zurückkehren, die nur eine Sammlung der ursprünglichen zwei ist:

class Aggregate 
{ 
    public AlertData AlertData {get;set;} 
    public SoundRequestData SoundRequestData { get; set; } 
    public int Id { get { return AlertData == null ? SoundRequestData.Id : AlertData.Id; } } 
} 

Dies ist die Verbindungslogik:

var joined = Observable.Merge( // Convert the two sources into half-filled aggregates and merge them 
     source1.Select(a => new Aggregate() { AlertData = a }), 
     source2.Select(s => new Aggregate() { SoundRequestData = s })) 
    .GroupBy(a => a.Id) 
    // We only need two for each Id 
    .Select(group => group.Take(2)) 
    // This looks ugly, but is just joining the two messages into one 
    .Select(group => group.Aggregate(new Aggregate(), (agg, newData) => new Aggregate() { AlertData = agg.AlertData ?? newData.AlertData, SoundRequestData = agg.SoundRequestData ?? newData.SoundRequestData })) 
    // Back to one stream 
    .Merge(); 
Verwandte Themen