2015-06-29 6 views
7

erzeugt wird Ich habe ein Observable von einem regulären .NET-Ereignis generiert. Das Observable ist heiß - nicht warm - in dem Sinne, dass es beginnt, Werte zu produzieren, noch vor jedem Abonnement, und jedes Mal, wenn jemand abonniert wird es den neuesten produzierten Wert erhalten. Lassen Sie uns dies eventStream nennen.Wählen Observable neuesten Wert, wenn ein Wert von einem anderen Observable

Dann habe ich eine andere Observable, von einer anderen Klasse ausgesetzt, die einige Statusfluss darstellt, so dass jeder neue Wert den aktuellen Zustand von etwas durch diese Klasse verwaltet gibt. Dieses Observable ist auch heiß. Sagen wir es stateStream.

Jedes Mal, wenn die Ereignissequenz einen neuen Wert erzeugt, möchte ich den letzten Wert aus der Statussequenz auswählen (ich würde sagen Beispiel, aber das könnte zu Verwirrung führen). Dies sollte eine neue Sequenz, die Kombination der beiden Werte produzieren, sie dann die Verarbeitung usw.

Das ist, was ich kam mit, aber es scheint nicht, obwohl zu arbeiten:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/); 
var stateStream = someDependency.SomeStateStream; 

eventStream.Select(eventValue => 
    stateStream 
    .Take(1) 
    .Select(stateValue => new { Event = eventValue, State = stateValue })) 
    .Switch() 
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State)) 
    .Subscribe(value => /* do something */); 

Die Logik dahinter ist von anderen ähnlichen Szenarien, mit denen ich gearbeitet habe, wo ein neuer Wert von einer Quelle verursacht eine neue Subskription zu laufen, damit eine neue Observable wird zurückgegeben, und schließlich die IObservable<IObservable<...>> wird in eine eindimensionale IObservable wieder verwendet werden Switch() oder ein ähnlicher Operator.
Aber in diesem Fall, von einem schnellen Test scheint es keine neue Subskription zu geben, und nur der erste stateStream Wert wird produziert. Stattdessen möchte ich den ersten Wert (Take(1)) jedes Mal auswählen, das die eventStream feuert.

AFAIK, CombineLatest und Zip kann nicht die Rechnung passen: CombineLatest wird jedes Mal ausgelöst, wenn eine der beiden Sequenzen einen neuen Wert liefert; Zip wird jedesmal ausgelöst, wenn beide Sequenzen einen neuen Wert haben, und typischerweise bedeutet dies, wenn die langsamste der beiden Werte hat. And/Then/When sollte aus dem gleichen Grund wie Zip nicht richtig sein.

Ich habe auch SO Thread combining one observable with latest from another observable überprüft, aber ich denke nicht, dass das hier gelten kann. Nur in einer der Kommentare lesen I

[...] und wirkt dann wie ein Scan CombineLatest, die nur von einer Seite für Benachrichtigungen Filter

und irgendwie bekannt vorkam, aber ich konnte nicht wickle meinen Kopf darum herum.

Antwort

4

Ich glaube, Sie beobachtbare wollen.Probe()

stateSource.Sample(eventSource) 
    .Zip(eventSource,...) 
+0

Ich hatte IntroToRx gelesen, und es war mir nicht klar, dass ich auch anhand einer anderen Observablen Probe nehmen konnte. Das klingt gut. – superjos

+0

Ich versuchte mit diesem Ansatz und das unmittelbare Problem schien gelöst: die Probenahme war effektiv. Und mit dem schließenden 'Zip (eventSource, ...) 'konnte ich auch den Event-Wert * auf den Chian bringen. Dann hatte ich weitere Anforderungen: Ereignisse anders zu verarbeiten, basierend auf dem neuesten Stand, plus einige mehr. Ich war in der Lage, die folgenden @ Enigmatismus Ansatz zu lösen. Obwohl ich nicht versucht habe, diese Anforderung mit Ihrem Ansatz zu erfüllen, sieht es auch so aus. – superjos

+0

... und es funktioniert tatsächlich, nach aktuellen Unit-Tests zu urteilen – superjos

0

Sie benötigen einen Standardwert für den Status-Stream, falls dieser noch nie ausgegeben wurde. Führen Sie diese Standardeinstellung als Argument für MostRecent (I nur verwendet null hier) und verwenden Sie die Überlastung von Zip, die eine IEnumerable erwartet:

eventStream.Zip(
    stateStream.MostRecent(null), 
    (evt,state) => new { Event = evt, State = state }) 
.Subscribe(/* etc */); 
+0

Uh! Ich glaube nicht, dass ich über 'MostRecent' (noch' Latest') Bescheid wusste, ich werde mich darum kümmern müssen! Danke – superjos

+0

Was passiert hier, wenn * stateStream * einen neuen Wert nach einem * eventStream * N-ten Wert und vor dem nächsten, N-ten + 1 erzeugt? Ich brauche die Ausgabe, um in diesem Fall unempfindlich zu sein, und erfasse einfach den neuen Zustandswert, wenn der N-te + 1 Ereigniswert ankommt. Es scheint mir, dass Zippen über MostRecent immer noch emittiert, sobald sich der Zustand ändert. Ich denke, ich muss etwas vermissen – superjos

+0

Nein, der EventStream-Emitting ist das einzige, was ein Ereignis auslöst. Ich schlage vor, Sie versuchen, den Code auszuführen, um zu sehen! Dies ist nicht die normale Überladung von Zip. –

1

Es scheint mir, dass Ihre aktuelle Lösung ist eigentlich ziemlich nahe ist, aber es klingt wie Sie nur die eventStream & stateStream Observablen um tauschen müssen und dann die .Take(1) entfernen.

Versuchen Sie folgendes:

stateStream 
    .Select(stateValue => 
     eventStream 
      .Select(eventValue => new { Event = eventValue, State = stateValue })) 
    .Switch() 
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State)) 
    .Subscribe(value => { /* do something */ }); 

Je nachdem, wie stateStream konfiguriert ist, müssen Sie möglicherweise eine .StartWith(...), um es hinzuzufügen, um die Anfangswerte von eventStream zu bekommen, aber ich denke, dieser Ansatz Ihre Anforderungen abdeckt.

+0

Könnten Sie bitte etwas über den Tausch erfahren? Es sieht so aus, wenn ich 'eventStream' jedes Mal auswähle, wenn sich der Status ändert, dann könnte ich einige' eventStream'-Vorkommnisse verpassen. Vielleicht habe ich das falsch verstanden. Re: 'StartsWith()': während 'someDependency' aufgebaut ist, wird' stateStream' mit 'Replay (1)' und 'Connect()' erzeugt und hat eine eigene 'StartsWith()', also nach der Zeit Es geht hier schon los. Wie auch immer, ich denke darüber nach, dem Stream, der hier gebaut und abonniert wird, einen Anfangswert hinzuzufügen. – superjos

+0

Uhm, eigentlich denkt mehr über diesen Ansatz nach, es könnte gut funktionieren: es sieht so aus, als könnte man tatsächlich die Art und Weise ändern, wie jedes Ereignis verarbeitet wird, basierend auf dem letzten Stand ... was eigentlich das ist, wonach ich suche. – superjos

0

Die neueste Version (Beta) von Rx.Net 2.3.0-beta2 hat WithLatestFrom:

//Only emits when eventStream emits 
eventStream.WithLatestFrom(stateStream.StartWith(defaultState), 
          (evt, state) => new {Event = evt, State = state}) 
      .Subscribe(/*Do something*/); 

Wenn du es nicht in eine Prise unter Verwendung von (Anmerkung ungetestet) füllen können:

public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
    this IObservable<TLeft> source, 
    IObservable<TRight> other, 
    Func<TLeft, TRight, TResult> resultSelector) 
{ 
    return source.Publish(os => 
     other.Select(a => os 
      .Select(b => resultSelector(b,a))) 
      .Switch()); 
} 

Source mit freundlicher Genehmigung von @JamesWorld, wenn ich mich nicht irre.

+0

cool, danke, dass du das vorgeschlagen hast, ich wusste es nicht. – superjos

Verwandte Themen