2016-11-17 3 views
2

Ich bin sehr neu in Rx und versuche, meinen Kopf darum zu wickeln. Ich habe nicht viel gelesen, aber zuerst versucht, ein Praktikum zu machen.Rx.net - synchrone vs asynchrone Beobachter - hängt von der Quelle ab?

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values with delays 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100)); 
     IObserver<int> handler = null; 

     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

Dies führt async Interleaved-Ausführung wie erwartet.

Auf der anderen Seite, wenn ich die Quelle ändern, um synchron zu sein, werden auch die Beobachter blockiert und synchron (gleiche Thread-ID, geht nicht zu Sub2, ohne vollständig Sub1 zu verbrauchen). Kann mir jemand helfen, das zu verstehen? Hier ist die Sync-Version

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i); 
     IObserver<int> handler = null; 

     // two observers that consume - first with a delay and the second immediately. 
     // in this case, the behavior of the observers becomes synchronous? 
     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

Antwort

2

Ich glaube, der Grund dafür ist der ausgewählte Standard IScheduler für den Betreiber. Werfen Sie einen Blick auf die angenommene Antwort here.

Für Generate hängt es von der Überlastung ab. Basierend auf der Antwort sind dies die standardmäßig verwendeten Scheduler. Sie können sie für Quelle überprüfen, ob Sie

  • Standard IScheduler für die zeitliche Operator mag, ist DefaultScheduler.Instance
  • Standard IScheduler für letztere Operator CurrentThreadScheduler.Instance

Sie können dies bestätigen, indem Sie einen „Nicht-Bereitstellung blockieren "Scheduler in Ihrer Synchronisationsversion

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);

+0

Sieht so aus, als ob Sie die Überladungen, die einen Zeitbereich benötigen, den Scheduler 'Scheduler.Default' auswählen, während solche ohne Zeitbereich einen' Scheduler.Immediate' verwenden – Raghu

Verwandte Themen