2015-03-25 11 views
5

Ich habe eine Klasse, die für die Generierung von Ereignissen in einem häufigen, aber unregelmäßigen Intervall verantwortlich ist, das andere Klassen verbrauchen und bearbeiten müssen. Ich möchte Reactive Extensions für diese Aufgabe verwenden.Wie schiebe ich eine Entität auf eine Rx Observable?

Die Verbraucherseite ist sehr einfach; Ich habe meine Verbraucherklasse IObserver<Payload> und alles scheint gut. Das Problem kommt auf die Erzeugerklasse.

Implementierung IObservable<Payload> direkt (das heißt, für IDisposable Subscribe(IObserver<Payload>) meine eigene Implementierung setzen wird, entsprechend die Dokumentation, nicht zu empfehlen. Es schlägt vor, statt mit dem Observable.Create() Satz von Funktionen zu komponieren. Da meine Klasse für eine lange Zeit laufen, ich habe versucht, mit var myObservable = Observable.Never() einen beobachtbaren erstellen, und dann, wenn ich neue Nutzlasten zur Verfügung habe, myObservable.Publish(payloadData) anrufen. wenn ich das tue, aber ich scheine nicht die OnNext Umsetzung in meinem Verbraucher zu treffen.

ich denke, als ein Workaround, ich kann ein Ereignis in meiner Klasse erstellen und dann erstellen Sie die Observable mit der FromEvent Funktion, aber das scheint wie ein Overly Co mplizierten Ansatz (d. h. es erscheint seltsam, dass die neue Schärfe von Observablen dazu führt, dass Ereignisse funktionieren). Gibt es einen einfachen Ansatz, den ich hier übersehen habe? Wie können Sie Ihre eigenen Observable-Quellen erstellen?

+1

Wenn Sie in der Regel feststellen, dass Sie entweder "IObservable " oder "IObserver " implementieren, dann tun Sie wahrscheinlich etwas falsch. – Enigmativity

Antwort

6

ein new Subject<Payload> erstellen und nennt es OnNext Methode ist ein Ereignis zu senden. Sie können Subscribe zum Thema mit Ihrem Beobachter.

Die Verwendung von Themen wird oft diskutiert. Für eine gründliche Diskussion über die Verwendung von Subjekten (die mit einem Primer verknüpft sind), siehe here - aber zusammenfassend klingt dieser Anwendungsfall gültig (d.h. er erfüllt wahrscheinlich die lokalen + heißen Kriterien).

Nebenbei, Überladungen von Subscribe akzeptieren Delegierten möglicherweise die Notwendigkeit für Sie, eine Implementierung von IObserver<T> bereitzustellen.

+0

Das Konzept des Betreffs und die bereitgestellten Links haben mir wirklich geholfen, dies vollständiger zu verstehen. – GWLlosa

1

Observable.Never() keine Benachrichtigungen auf „Senden“, Sie Observable.Return(yourValue)

verwenden sollten, wenn Sie einen Leitfaden mit konkreten Beispielen muss ich lesen empfehlen Intro to Rx

+0

Ich kann hier sehr verwirrt sein ... Wenn ich Observable.Return verwende, wird das Observable dann nicht nur den einen Wert haben? Ich brauche eine, die für eine Weile am Leben bleibt, damit ich regelmäßig mehr Werte darauf drängen kann, wenn ich sie berechne. – GWLlosa

+0

Sie sollten wirklich "Intro to Rx" lesen, IObservable ist eine Folge von Ereignissen, wenn der Observer OnNext (T-Wert) bekommt, erhalten Sie den neuesten Wert, mit dem Sie arbeiten. – kondas

+1

Gemäß den Dokumenten, "Eine letzte wesentliche Factory-Methode oder primitiven Konstruktor heißt Return. Seine Rolle ist die Darstellung eines einzigen Elements Sequenz, genau wie ein Single-Zelle-Array wäre in der Welt der IEnumerable Sequenzen. Das Verhalten beobachtet von abonnierte Beobachter sind zwei Nachrichten: OnNext mit dem Wert und OnCompleted Signalisierung das Ende der Sequenz wurde empfangen: IObservable source = Observable.Rückkehr (42); Das Ausführen dieses Codes erzeugt die folgende Ausgabe: OnNext: 42 OnCompleted ". Ich möchte nicht das vollständige Signal, ich möchte die Sequenz offen/lebendig bleiben. – GWLlosa

0

Es sei denn, ich auf einen besseren Weg kommen, es zu tun, was ich haben sich für jetzt festgelegt ist die Verwendung eines BlockingCollection.

var _itemsToSend = new BlockingCollection<Payload>(); 
IObservable<MessageQueuePayload> _deliverer = 
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default); 
Verwandte Themen