Ich bin sehr neu in Rx und versuche, meinen Kopf darum zu wickeln. Ich habe nicht viel gelesen, aber zuerst versucht, ein Praktikum zu machen.Rx.net - synchrone vs asynchrone Beobachter - hängt von der Quelle ab?
class Program
{
static void Main(string[] args)
{
// one source, produces values with delays
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
IObserver<int> handler = null;
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
Dies führt async Interleaved-Ausführung wie erwartet.
Auf der anderen Seite, wenn ich die Quelle ändern, um synchron zu sein, werden auch die Beobachter blockiert und synchron (gleiche Thread-ID, geht nicht zu Sub2, ohne vollständig Sub1 zu verbrauchen). Kann mir jemand helfen, das zu verstehen? Hier ist die Sync-Version
class Program
{
static void Main(string[] args)
{
// one source, produces values
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
IObserver<int> handler = null;
// two observers that consume - first with a delay and the second immediately.
// in this case, the behavior of the observers becomes synchronous?
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
Sieht so aus, als ob Sie die Überladungen, die einen Zeitbereich benötigen, den Scheduler 'Scheduler.Default' auswählen, während solche ohne Zeitbereich einen' Scheduler.Immediate' verwenden – Raghu