Für den Anfang sollten Sie in der Regel keine eigenen Objekte IObservable
und IObserver
implementieren. Deine Frage gibt Hinweise darauf, warum. Es ist sehr schwierig, das zugrunde liegende Verhalten zu korrigieren.
Nun gibt jeder Aufruf an IObservable.Subscribe
einen IDisposable
zurück. Dies wird verwendet, wenn der Anrufer von Subscribe
sich von einer Observablen abmelden möchte, bevor eine OnCompleted
oder OnError
aufgerufen wird. Wenn jedoch eine OnCompleted
oder OnError
aufgerufen wird, wird die IDisposable
automatisch entsorgt. Effektiv räumt Rx automatisch auf, wenn eine beobachtbare Sammlung abgeschlossen ist.
Jeder einzelne Beobachter muss seine eigene Abonnementlebensdauer nicht verwalten. Der Beobachter muss nur auf die Nachricht OnCompleted
/OnError
antworten.
In Ihrem Code würde ich vorschlagen, dass Sie darüber nachdenken, Ihren Code leicht zu ändern. Ich würde erwarten, dass eine CommandReaderPublisher
Klasse mit einer Subscribe
Methode geeigneter wäre als eine CommandReader
Klasse. Sobald ein Rx-Stream abgeschlossen ist, kann er nicht mehr verwendet werden.
Auch ich frage mich, ob Anruf OnCompleted
wäre besser als OnError(exception)
, wenn der zugrunde liegende Stream schließt. Wenn ein Fehler auftritt, ist das in Ordnung, aber wenn es geschlossen wird, dann könnte ONCompleted
besser sein.
+1 Implementieren Sie die IObserver/IObservable-Schnittstellen nicht. Verwenden Sie die Rx-Bibliothek. Observable.Create/Generate/etc ist, wie Sie eine Observable-Sequenz erstellen und die Subscriber-Erweiterungsmethode ist, wie Sie eine beobachtbare Sequenz konsumieren. www.IntroToRx.com. –