erzeugt wird Ich habe ein Observable von einem regulären .NET-Ereignis generiert. Das Observable ist heiß - nicht warm - in dem Sinne, dass es beginnt, Werte zu produzieren, noch vor jedem Abonnement, und jedes Mal, wenn jemand abonniert wird es den neuesten produzierten Wert erhalten. Lassen Sie uns dies eventStream
nennen.Wählen Observable neuesten Wert, wenn ein Wert von einem anderen Observable
Dann habe ich eine andere Observable, von einer anderen Klasse ausgesetzt, die einige Statusfluss darstellt, so dass jeder neue Wert den aktuellen Zustand von etwas durch diese Klasse verwaltet gibt. Dieses Observable ist auch heiß. Sagen wir es stateStream
.
Jedes Mal, wenn die Ereignissequenz einen neuen Wert erzeugt, möchte ich den letzten Wert aus der Statussequenz auswählen (ich würde sagen Beispiel, aber das könnte zu Verwirrung führen). Dies sollte eine neue Sequenz, die Kombination der beiden Werte produzieren, sie dann die Verarbeitung usw.
Das ist, was ich kam mit, aber es scheint nicht, obwohl zu arbeiten:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
Die Logik dahinter ist von anderen ähnlichen Szenarien, mit denen ich gearbeitet habe, wo ein neuer Wert von einer Quelle verursacht eine neue Subskription zu laufen, damit eine neue Observable wird zurückgegeben, und schließlich die IObservable<IObservable<...>>
wird in eine eindimensionale IObservable wieder verwendet werden Switch()
oder ein ähnlicher Operator.
Aber in diesem Fall, von einem schnellen Test scheint es keine neue Subskription zu geben, und nur der erste stateStream
Wert wird produziert. Stattdessen möchte ich den ersten Wert (Take(1)
) jedes Mal auswählen, das die eventStream
feuert.
AFAIK, CombineLatest
und Zip
kann nicht die Rechnung passen: CombineLatest
wird jedes Mal ausgelöst, wenn eine der beiden Sequenzen einen neuen Wert liefert; Zip
wird jedesmal ausgelöst, wenn beide Sequenzen einen neuen Wert haben, und typischerweise bedeutet dies, wenn die langsamste der beiden Werte hat. And/Then/When
sollte aus dem gleichen Grund wie Zip
nicht richtig sein.
Ich habe auch SO Thread combining one observable with latest from another observable überprüft, aber ich denke nicht, dass das hier gelten kann. Nur in einer der Kommentare lesen I
[...] und wirkt dann wie ein Scan CombineLatest, die nur von einer Seite für Benachrichtigungen Filter
und irgendwie bekannt vorkam, aber ich konnte nicht wickle meinen Kopf darum herum.
Ich hatte IntroToRx gelesen, und es war mir nicht klar, dass ich auch anhand einer anderen Observablen Probe nehmen konnte. Das klingt gut. – superjos
Ich versuchte mit diesem Ansatz und das unmittelbare Problem schien gelöst: die Probenahme war effektiv. Und mit dem schließenden 'Zip (eventSource, ...) 'konnte ich auch den Event-Wert * auf den Chian bringen. Dann hatte ich weitere Anforderungen: Ereignisse anders zu verarbeiten, basierend auf dem neuesten Stand, plus einige mehr. Ich war in der Lage, die folgenden @ Enigmatismus Ansatz zu lösen. Obwohl ich nicht versucht habe, diese Anforderung mit Ihrem Ansatz zu erfüllen, sieht es auch so aus. – superjos
... und es funktioniert tatsächlich, nach aktuellen Unit-Tests zu urteilen – superjos