2012-04-02 10 views
1

Ich werde mit dem einfachsten zu sagen Teil meiner Frage ist da eine Implementierung eines Rx Intersect-Operator da draußen?Rx Intersect-Operator

Grundsätzlich habe ich zwei Ströme, die Werte erzeugen werden. Lassen Sie uns sagen Strom 1 erzeugt: A, B, C, D, E, F, G Und Strom 2 erzeugt: B, D, F

Beide Ströme wird abgeschlossen und sind nicht unendlich (für den Hintergrund: sie vorgesehen sind durch zwei verschiedene Datenquellen, die wir gleichzeitig abgefragt haben).

Hat jemand irgendwelche Empfehlungen, wie man einen asynchronen Intersect-Operator in der Rx-Welt implementiert?

Antwort

3

Soweit ich weiß, gibt es keine "offizielle" Implementierung. Meistens müssten Sie die Werte aus den beiden Quellen sammeln, um sie zu speichern und nach Übereinstimmungen in der anderen Quelle zu suchen. So etwas sollte Ihnen den Einstieg:

<Extension()> 
Public Function Intersect(Of T)(first As IObservable(Of T), 
           second As IObservable(Of T), 
           comparer As IEqualityComparer(Of T) 
           ) As IObservable(Of T) 
    If first Is Nothing Then Throw New ArgumentNullException("first") 
    If second Is Nothing Then Throw New ArgumentNullException("second") 
    If comparer Is Nothing Then Throw New ArgumentException("comparer") 

    Return Observable.Create(Of T)(
     Function(obs) 
      Dim gate As New Object() 
      Dim firstItems As New HashSet(Of T)(comparer) 
      Dim secondItems As New HashSet(Of T)(comparer) 
      Dim firstCompleted, secondCompleted As Boolean 

      Dim disp As New CompositeDisposable(2) 
      disp.Add(first.Subscribe(Sub(v) 
             SyncLock gate 
              firstItems.Add(v) 
              If secondItems.Contains(v) Then obs.OnNext(v) 
             End SyncLock 
            End Sub, 
            AddressOf obs.OnError, 
            Sub() 
             SyncLock gate 
              firstCompleted = True 
              If secondCompleted Then obs.OnCompleted() 
             End SyncLock 
            End Sub)) 
      disp.Add(second.Subscribe(Sub(v) 
              SyncLock gate 
               secondItems.Add(v) 
               If firstItems.Contains(v) Then obs.OnNext(v) 
              End SyncLock 
             End Sub, 
             AddressOf obs.OnError, 
             Sub() 
              SyncLock gate 
               secondCompleted = True 
               If firstCompleted Then obs.OnCompleted() 
              End SyncLock 
             End Sub)) 
      Return disp 
     End Function) 
End Function 

Diese Implementierung wird wiederholt, Streichhölzer, wenn die Eingabe mehrere occurrances enthält, aber erst, nachdem es in den beiden Quellen gefunden wurde. Zum Beispiel

first ----1---2---1----2---1---1----| 
second ----------2----1-----------| 
out ----------2----1-2---1---1----| 

Wenn die Wiederholungen nicht wünschenswert sind, können Sie überprüfen, um zu sehen, dass es nicht in der entsprechenden Quellensammlung ist. Das Abonnement würde zuerst ändern:

first.Subscribe(Sub(v) 
        SyncLock gate 
         'check that the first doesn't already contain this value 
         If firstItems.Add(v) AndAlso 
          secondItems.Contains(v) Then obs.OnNext(v) 
        End SyncLock 
       End Sub, 
       AddressOf obs.OnError, 
       Sub() 
        SyncLock gate 
         firstCompleted = True 
         If secondCompleted Then obs.OnCompleted() 
        End SyncLock 
       End Sub) 

mit dem zweiten Abonnement ähnlich zu ändern.

+0

Dank. Das funktioniert gut. – Damian

1

Kannst du nicht einfach tun:

var intersect = from x in stream1 
       from y in stream2 
       where x == y 
       select x; 
+0

Das war auch mein unmittelbarer Gedanke, Richard. Ich dachte mir, das wäre eine einfache Frage, um dafür zu representieren. Ich habe es verpasst, aber ich habe dir etwas gegeben. :-) – Enigmativity

+0

@Enigmatismus;) –

+2

Zwei mögliche Probleme mit dieser Lösung kommen in den Sinn. Was passiert, wenn stream2 heiß ist und die Übereinstimmung von stream1 nach dem übereinstimmenden Wert in stream2 kommt? Was ist, wenn stream2 kalt ist und ein Abonnement für jeden Wert in stream1 ein Bad Thing (tm) ist? –

4

Hier ist eine andere Implementierung, die für heiße Observablen arbeitet, gibt es Ihnen eine Reihe Kreuzung, so dass, wenn 'c' in beiden Strömen dreimal erscheint, wird es nur einmal erscheinen im Schnittstrom.

IObservable<char> stream1; 
IObservable<char> stream2; 

var intersect = Observable 
    .Merge(stream1.Distinct(), stream2.Distinct()) 
    .GroupBy(c=>c) 
    .SelectMany(g=>g.Skip(1).Take(1)); 

Beispiel:

stream1 ---a---b--c--d--e--a-b-c 
stream2 -b---a---e---------a-b-c 
intersect -----a-b--------e------c 
+0

Schön! Ich dachte, es könnte eine Möglichkeit geben, dies mit Gruppierung zu tun. Ich habe bereits die Antwort markiert, aber das verdient eine Erwähnung. – Damian

+0

Wie kommt es, dass das letzte Element von 'stream2', d. H. Das' c', nicht im 'intersect' Stream ausgegeben wird? –

+0

@MihaiPantea guten Punkt, sollte es. –