2017-05-17 2 views
1

Ich würde gerne Best Practices auf heißen observable und IDisposable Objekte als Ereignistyp finden.Hot Observable und IDisposable

Nehmen Sie an, dass mein Code Bitmap-Objekte als Hot Observable erzeugt und ich mehrere Abonnenten habe. Zum Beispiel:

public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler) 
    { 
     return Directory.GetFiles(path, "*.bmp") 
      .ToObservable(scheduler) 
      .Select(x => new Bitmap(x)) 
      .Publish() 
      .RefCount(); 
    } 

public void Main() 
{ 
    var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); 
    var process1 = images.Subscribe(SaveBwImages); 
    var process2 = images.Subscribe(SaveScaledImages); 
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages); 
} 

So ist die Frage: Was sind die besten Praktiken ist Einweg Ressourcen zu behandeln, die Quelle eines heißen beobachtbar sind?

In diesem Beispiel möchte ich Bilder nach dem Gebrauch entsorgen, aber ich kann nicht herausfinden - wenn genau?

Das ist nicht offensichtlich, in welcher Reihenfolge abonniert Ereignisse aufgerufen werden, so kann ich nicht über eine "letzte" verfügen.

Vielen Dank im Voraus.

Antwort

3

Ihre Observable ist nicht heiß. Es ist eine Kaltbeobachtung mit einer gemeinsamen Quelle und es bewirkt nur, dass sich die nachfolgenden Beobachter so verhalten, als ob sie eine heiße Observable hätten. Es ist wahrscheinlich am besten als warm beobachtbar beschrieben.

auf ein Beispiel Werfen wir einen Blick:

var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount(); 

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); 
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); 
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); 

Thread.Sleep(10000); 

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); }); 

Observable 
    .Range(0, 3) 
    .ObserveOn(Scheduler.Default) 
    .Publish() 
    .RefCount() 
    .Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); }); 

Als ich das laufen erhalte ich:

 
A 
A 
B 
C 
A 
B 
C 
E 
E 
E 

Die "B" & "C" Beobachter der erste Wert der Folge zu verpassen.

Und nachdem die Beobachter "A", "B" und "C" fertig sind, ist die Sequenz beendet, so dass "D" niemals einen Wert erhält. Ich musste ein brandneues Observable erstellen, um die Werte "E" anzuzeigen.

Also, in Ihrem Code haben Sie ein Problem, wenn der erste Beobachter einen oder mehrere Werte vor dem zweiten und dritten Subskribenten beendet, dann verpassen diese Beobachter Werte. Ist es das was du willst?

Trotzdem fragt Ihre Frage, wie man mit Einwegwerten umgehen soll, die von einer Observablen zurückgegeben werden. Es ist einfach, wenn Sie Observable.Using verwenden. nie

 
A 
B 
C 
Disposed! 
A 
B 
C 
Disposed! 
A 
B 
C 
Disposed! 

Again "D":

Hier ist eine ähnliche Situation wie Ihr Code:

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) 
{ 
    return 
     Observable 
      .Range(0, 3) 
      .ObserveOn(Scheduler.Default) 
      .SelectMany(x => 
       Observable 
        .Using(
         () => Disposable.Create(() => Console.WriteLine("Disposed!")), 
         y => Observable.Return(y))) 
     .Publish() 
     .RefCount(); 
} 

Nun, wenn ich diesen Code ausführen:

var query = ImagesInFolder(Scheduler.Default); 

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); 
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); 
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); 

Thread.Sleep(10000); 

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); }); 

Ich erhalte diese Ausgabe produziert irgendwelche Werte - und es ist für "B" & "C" möglich, Werte zu verpassen, aber dies zeigt, wie man zurückkehrt ein beobachtbarer Wert, der automatisch mit dem/den Beobachter/n beseitigt wird, ist/sind beendet.

Ihr Code würde wie folgt aussehen:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) 
{ 
    return 
     Directory 
      .GetFiles(path, "*.bmp") 
      .ToObservable(scheduler) 
      .SelectMany(x => 
       Observable 
        .Using(
         () => new System.Drawing.Bitmap(x), 
         bm => Observable.Return(bm))) 
     .Publish() 
     .RefCount(); 
} 

aber sie ist immer noch im Land der möglicherweise fehlende Werte.

Deshalb brauchen Sie wirklich, dies zu tun:

public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) 
{ 
    return 
     Directory 
      .GetFiles(path, "*.bmp") 
      .ToObservable(scheduler) 
      .SelectMany(x => 
       Observable 
        .Using(
         () => new System.Drawing.Bitmap(x), 
         bm => Observable.Return(bm))) 
      .Publish(); 
} 

Dann sind Sie es so nennen:

public void Main() 
{ 
    var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); 
    var process1 = images.Subscribe(SaveBwImages); 
    var process2 = images.Subscribe(SaveScaledImages); 
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages); 
    images.Connect(); 
} 

Die andere Option ist die ganze .Publish().RefCount() Code fallen zu lassen und sicherzustellen, dass Sie es richtig selbst wenn Sie abonnieren.

Versuchen Sie diesen Code:

void Main() 
{ 
    ImagesInFolder(Scheduler.Default) 
     .Publish(iif => 
      Observable 
       .Merge(
        iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }), 
        iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }), 
        iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; }))) 
     .Subscribe(); 
} 

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) 
{ 
    return 
     Observable 
      .Range(0, 3) 
      .ObserveOn(Scheduler.Default) 
      .SelectMany(x => 
       Observable 
        .Using(
         () => Disposable.Create(() => Console.WriteLine("Disposed!")), 
         y => Observable.Return(y))); 
} 

ich diese heraus:

 
A 
B 
C 
Disposed! 
A 
B 
C 
Disposed! 
A 
B 
C 
Disposed! 

Wieder ein Disposed! nach jedem Beobachter ausgeführt wird, aber das Problem ist jetzt, dass ich die Verzögerung bei der Verarbeitung geändert von jedem Beobachter, aber der Code gibt immer noch die Reihenfolge aus, in der die Beobachter hinzugefügt wurden. Das Problem besteht darin, dass Rx jeden Beobachter der Reihe nach ausführt und jeder erzeugte Wert der Reihe nach ist.

Ich vermute, dass Sie dachten, Sie könnten parallele Verarbeitung mit .Publish() erhalten. Du nicht.

Der Weg, um dies parallel zu laufen, ist die .Publish() ganz zu löschen.

tun Gerade diese Art der Sache:

void Main() 
{ 
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); 
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); 
    ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); 
} 

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) 
{ 
    return 
     Observable 
      .Range(0, 3) 
      .ObserveOn(Scheduler.Default) 
      .SelectMany(x => 
       Observable 
        .Using(
         () => Disposable.Create(() => Console.WriteLine("Disposed!")), 
         y => Observable.Return(y))); 
} 

ich das jetzt bekommen:

 
A 
Disposed! 
C 
Disposed! 
A 
Disposed! 
B 
Disposed! 
A 
Disposed! 
C 
Disposed! 
C 
Disposed! 
B 
Disposed! 
B 
Disposed! 

Der Code nun parallel läuft und endet so schnell wie möglich - und richtig verfügt über die IDisposable wenn Das Abonnement endet. Sie haben einfach nicht die Freude, mit jedem Beobachter eine einzige verfügbare Ressource zu teilen, aber Sie bekommen auch nicht alle Verhaltensprobleme.

+0

omg, das ist die beste Antwort, die ich mir vorstellen kann! vielen Dank! – Dmitry