2016-12-20 2 views
2

Ich mag ein IEnumerable, IDisposable (Quelle) in ein beobachtbares anzupassen und möchte den besten Weg, wissen, dies zu tun und haben die source.Dispose Methode aufgerufen nach dem Abbestellen.C# Rx Wie man richtig von Quelle entsorgen Enumerable in beobachtbare erstellt

Es gibt eine example auf introtorx.com der Anpassung eines IEnumerable, aber es gibt explizit an, dass es viele Mängel wie falsche Entsorgung Muster, schlechte Nebenläufigkeit Modell, keine Fehlerbehandlung, etc ... und dass die eingebaute Version hat handhabt diese. Aber die eingebaute Version scheint Dispose auf der Quelle IEnumerable nach Abbestellung nicht aufzurufen.

Idealerweise würde ich gerne das Muster .Publish().RefCount() verwenden, um mehrere Abonnenten an derselben Quelle zu haben und nur die Quelle Dispose() aufzurufen, wenn sie alle abgemeldet sind.

Hier sind ist der Code für meinen Versuch, wenn es nicht funktioniert.

static void FromEnumerableTest() { 
    var observable = Observable.Create<int>(
     observer => { 
      var source = new JunkEnumerable(); 
      foreach (int i in source) { 
       observer.OnNext(i); 
      } 
      return() => { 
       source.Dispose(); 
      }; 
     }) 
     .SubscribeOn(Scheduler.Default) 
     .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
     .Publish() 
     .RefCount(); 

    //var observable = Observable.ToObservable(new JunkEnumerable()) 
    // .SubscribeOn(Scheduler.Default) 
    // .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
    // .Publish() 
    // .RefCount(); 

    Console.WriteLine("Press any key to subscribe"); 
    Console.ReadKey(); 

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); 
    Console.WriteLine("Press any key to unsubscribe"); 
    Console.ReadKey(); 
    subscription.Dispose(); 

    Console.WriteLine("Press any key to exit"); 
    Console.ReadKey(); 
} 


class JunkEnumerable : IEnumerable<int>, IDisposable { 
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); } 

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); } 

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

    class Enumerator : IEnumerator<int> { 
     private int counter = 0; 
     public int Current { 
      get { 
       Thread.Sleep(1000); 
       return counter++; 
      } 
     } 

     object IEnumerator.Current { get { return Current; } } 

     public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); } 

     public bool MoveNext() { return true; } 

     public void Reset() { counter = 0; } 
    } 
} 
+0

möglich Duplikat http://stackoverflow.com/questions/7322395/creating-a-weak-subscription-to-an -iobservable –

Antwort

1

Es gibt drei Stufen in einer Rx-Abonnement-Lebensdauer:

  1. Abonnement
  2. Beobachtung
  3. Unsubscription

Wenn das Abonnement nicht abgeschlossen ist, die Abmelde Code doesn‘ t passiert. Wenn Sie sich nie vollständig angemeldet haben, warum sollten Sie sich dann abmelden? Ihr Beispielcode hat eine Endlosschleife im Abonnementcode, sodass er nie abgeschlossen wird. Daher wird der Abmeldecode nie vorkommen.

Der normale Weg, um eine IDisposable zu behandeln ist mit Observable.Using. Der normale Umgang mit IEnumerable ist mit .ToObservable. Wenn Sie versuchen, die Asynchronität zu synchronen, zählbaren Code (wie Ihr Beispiel) einführen, können Sie dies wie folgt tun:

var observable = Observable.Using(() => new JunkEnumerable(), junk => 
    Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20)) 
); 

Solange der Timespan ist größer als 15 Millis, wird Rx drehen async, das Abonnement abschließen. Die nachfolgenden Werte sind Teil der Beobachtungsphase und die Abmeldung wird vollständig stattfinden.

+0

Ah! das ist toll. Ich beschönige die Methode "Observable.Generate" in der Dokumentation. [Hier] (http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html#ObservableTimer) ist ein Link für Neugierige. –

1

Hier ist ein Bediener die Aufzählung zu einem bestimmten Scheduler auszuführen. Wir planen jede Aufzählung der Aufzählung, damit die Disposables korrekt zurückgegeben werden können.

public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler)) 
    { 
     scheduler = scheduler ?? Scheduler.Default; 
     return Observable.Create<T>(
      (observer) => 
      { 
       var disposed = new BooleanDisposable(); 
       var enumerator = source.GetEnumerator(); 

       Action scheduleNext = default(Action); 
       scheduleNext =() => 
       { 
        if (disposed.IsDisposed) 
         return; 

        if (!enumerator.MoveNext()) 
        { 
         observer.OnCompleted(); 
         return; 
        } 

        observer.OnNext(enumerator.Current); 

        scheduler.Schedule(scheduleNext); 
       }; 

       scheduler.Schedule(scheduleNext); 
       return StableCompositeDisposable.Create(disposed, enumerator); 
      }); 
    } 

Von Ihrem Beispiel ändern wir einfach die SubscribeOn zu:

 var observable = 
      new JunkEnumerable() 
      .ToObservableOn(Scheduler.Default)     
      .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
      .Publish() 
      .RefCount(); 
Verwandte Themen