Ihre Observable ist nicht heiß. Es ist eine Kaltbeobachtung mit einer gemeinsamen Quelle und es bewirkt nur, dass sich die nachfolgenden Beobachter so verhalten, als ob sie eine heiße Observable hätten. Es ist wahrscheinlich am besten als warm beobachtbar beschrieben.
auf ein Beispiel Werfen wir einen Blick:
var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount();
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.Publish()
.RefCount()
.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });
Als ich das laufen erhalte ich:
A
A
B
C
A
B
C
E
E
E
Die "B" & "C" Beobachter der erste Wert der Folge zu verpassen.
Und nachdem die Beobachter "A", "B" und "C" fertig sind, ist die Sequenz beendet, so dass "D" niemals einen Wert erhält. Ich musste ein brandneues Observable erstellen, um die Werte "E" anzuzeigen.
Also, in Ihrem Code haben Sie ein Problem, wenn der erste Beobachter einen oder mehrere Werte vor dem zweiten und dritten Subskribenten beendet, dann verpassen diese Beobachter Werte. Ist es das was du willst?
Trotzdem fragt Ihre Frage, wie man mit Einwegwerten umgehen soll, die von einer Observablen zurückgegeben werden. Es ist einfach, wenn Sie Observable.Using
verwenden. nie
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
Again "D":
Hier ist eine ähnliche Situation wie Ihr Code:
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)))
.Publish()
.RefCount();
}
Nun, wenn ich diesen Code ausführen:
var query = ImagesInFolder(Scheduler.Default);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Ich erhalte diese Ausgabe produziert irgendwelche Werte - und es ist für "B" & "C" möglich, Werte zu verpassen, aber dies zeigt, wie man zurückkehrt ein beobachtbarer Wert, der automatisch mit dem/den Beobachter/n beseitigt wird, ist/sind beendet.
Ihr Code würde wie folgt aussehen:
public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish()
.RefCount();
}
aber sie ist immer noch im Land der möglicherweise fehlende Werte.
Deshalb brauchen Sie wirklich, dies zu tun:
public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish();
}
Dann sind Sie es so nennen:
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
images.Connect();
}
Die andere Option ist die ganze .Publish().RefCount()
Code fallen zu lassen und sicherzustellen, dass Sie es richtig selbst wenn Sie abonnieren.
Versuchen Sie diesen Code:
void Main()
{
ImagesInFolder(Scheduler.Default)
.Publish(iif =>
Observable
.Merge(
iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
.Subscribe();
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
ich diese heraus:
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
Wieder ein Disposed!
nach jedem Beobachter ausgeführt wird, aber das Problem ist jetzt, dass ich die Verzögerung bei der Verarbeitung geändert von jedem Beobachter, aber der Code gibt immer noch die Reihenfolge aus, in der die Beobachter hinzugefügt wurden. Das Problem besteht darin, dass Rx jeden Beobachter der Reihe nach ausführt und jeder erzeugte Wert der Reihe nach ist.
Ich vermute, dass Sie dachten, Sie könnten parallele Verarbeitung mit .Publish()
erhalten. Du nicht.
Der Weg, um dies parallel zu laufen, ist die .Publish()
ganz zu löschen.
tun Gerade diese Art der Sache:
void Main()
{
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); });
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
ich das jetzt bekommen:
A
Disposed!
C
Disposed!
A
Disposed!
B
Disposed!
A
Disposed!
C
Disposed!
C
Disposed!
B
Disposed!
B
Disposed!
Der Code nun parallel läuft und endet so schnell wie möglich - und richtig verfügt über die IDisposable
wenn Das Abonnement endet. Sie haben einfach nicht die Freude, mit jedem Beobachter eine einzige verfügbare Ressource zu teilen, aber Sie bekommen auch nicht alle Verhaltensprobleme.
omg, das ist die beste Antwort, die ich mir vorstellen kann! vielen Dank! – Dmitry