2017-02-27 4 views
1

ein einfaches Szenario Gegeben:Herzschlag-Muster unter Verwendung von reaktiver Erweiterung

A und B in einem Raum ist, ein Gespräch mit B. Der Raum ist dunkel und B konnte sehen, A. nicht wie B vorstellen konnte, wenn A Pausieren oder A wird aus dem Raum entführt?

Wenn A spricht, bietet A IObservable Talk, dass B abonniert anschließend an Talk.Subscribe (string => verarbeiten was A sagte). B könnte gleichzeitig Observable.Interval Heartbeat als Heartbeat-Überprüfung abonnieren.

Meine Frage ist, was Operator ich nutzen, um sollte zwei IObservable fusionieren/kombinieren, so dass, wenn kein Element aus Diskussion über zwei Elemente von ist Heartbeat, B die A entführt wurde übernehmen wird.

Bitte beachten Sie, dass ich eine Variable vermeiden möchte, um den Zustand zu speichern, da es die Nebenwirkung verursachen kann, wenn ich diese Variable nicht richtig synchronisiere.

Danke,

Antwort

2

einer Zustandsvariablen Sie handeln wollen Stellen Sie sich vor, mit dem Staat sprach die Anzahl der Herzschläge, da ‚A‘ darstellt letzte. Das würde wie folgt aussehen:

var stateObservable = Observable.Merge(     //State represent number of heartbeats since A last spoke 
    aSource.Select(_ => new Func<int, int>(i => 0)),  //When a talks, set state to 0 
    bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) //when b heartbeats, increment state 
) 
    .Scan(0, (state, func) => func(state)); 

Wir Vorfälle von A zu sprechen als Funktion repräsentieren den Zustand 0 und Vorfälle von B heartbeatting als Erhöhen des Zustand zurückgesetzt wird. Wir akkumulieren dann mit der Scan Funktion.

Der Rest ist nun einfach:

var isKidnapped = stateObservable 
    .Where(state => state >= 2) 
    .Take(1); 

isKidnapped.Subscribe(_ => Console.WriteLine("A is kidnapped")); 

EDIT:

Hier ist ein Beispiel mit n A Quellen:

var aSources = new Subject<Tuple<string, Subject<string>>>(); 
var bHeartbeat = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount(); 

var stateObservable = aSources.SelectMany(t => 
     Observable.Merge(
      t.Item2.Select(_ => new Func<int, int>(i => 0)), 
      bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) 
     ) 
     .Scan(0, (state, func) => func(state)) 
     .Where(state => state >= 2) 
     .Take(1) 
     .Select(_ => t.Item1) 
    ); 

stateObservable.Subscribe(s => Console.WriteLine($"{s} is kidnapped")); 
aSources 
    .SelectMany(t => t.Item2.Select(s => Tuple.Create(t.Item1, s))) 
    .Subscribe(t => Console.WriteLine($"{t.Item1} says '{t.Item2}'")); 
bHeartbeat.Subscribe(_ => Console.WriteLine("**Heartbeat**")); 

var a = new Subject<string>(); 
var c = new Subject<string>(); 
var d = new Subject<string>(); 
var e = new Subject<string>(); 
var f = new Subject<string>(); 

aSources.OnNext(Tuple.Create("A", a)); 
aSources.OnNext(Tuple.Create("C", c)); 
aSources.OnNext(Tuple.Create("D", d)); 
aSources.OnNext(Tuple.Create("E", e)); 
aSources.OnNext(Tuple.Create("F", f)); 

a.OnNext("Hello"); 
c.OnNext("My name is C"); 
d.OnNext("D is for Dog"); 
await Task.Delay(TimeSpan.FromMilliseconds(1200)); 
e.OnNext("Easy-E here"); 
a.OnNext("A is for Apple"); 
await Task.Delay(TimeSpan.FromMilliseconds(2200)); 
+0

Dank @Shlomo für Ihre Antwort , gibt es irgendwelche Möglichkeiten, diese Lösung auf n + 1 wh zu skalieren Ist n die Anzahl der Sprecher, die nur zur Laufzeit ermittelt werden können? – LxL

+0

Höchstwahrscheinlich. Könnten Sie das Problem besser definieren? – Shlomo

+0

Derzeit gibt es nur A und B, ich würde gerne sehen, ob die gleiche Lösung für C, D, E, F gelten kann, wenn die Rolle von A wie wirkt. Die Lösung, an die ich bisher gedacht habe, ist eine Liste von IObservable und eine Liste von Subskriptionen, die zur Laufzeit hinzugefügt werden. Gibt es einen eleganteren Weg? – LxL

Verwandte Themen