Ich habe zwei einfache beobachtende Handler mit einem Abonnement für die gleiche Quelle. Beide Abonnements arbeiten jedoch auf verschiedenen Arten. Ich möchte, dass sie die Reihenfolge der beobachtbaren Quelle (Subject()) beibehalten. Ich habe es mit der Synchronize() Erweiterung versucht, aber ich habe keine Möglichkeit gefunden, diese Arbeit wie erwartet zu bekommen.Wie synchronisiert man Observables und Offloading UI Thread
Hier ist meine Einheit Test-Code:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var are = new AutoResetEvent(false);
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000/o; // just to simulate longer processing
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
}
}
resultierende Ausgabe des Testcode:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
OnCompleted on threadId:12
Received 3 on threadId:11
Handled 3 on threadId: 11
OnCompleted on threadId:11
Wie Sie die Bestellung an den Eingang anders sehen kann. Ich möchte beide Abonnements synchronisieren, so dass die Bestellung die gleiche wie für die Eingabe ist.
sollte die Ausgabe
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3 on threadId:11
Handled 3 on threadId: 11
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
OnCompleted on threadId:11
OnCompleted on threadId:12
(Completion Ordnung für mich nicht so wichtig ist) sein.
EDIT:
Ich habe auch versucht die folgenden:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler);
var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
var are = new AutoResetEvent(false);
using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000/o;
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
are.WaitOne();
}
}
Aber der Ausgang ist immer noch falsch:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:4
Handled 1 on threadId: 4
Received 2 on threadId:4
Handled 2 on threadId: 4
Received 3 on threadId:4
Handled 3 on threadId: 4
OnCompleted on threadId:4
Received 1,1 on threadId:4
Handled 1,1 on threadId: 4
Received 2,1 on threadId:4
Handled 2,1 on threadId: 4
Received 3,1 on threadId:4
Handled 3,1 on threadId: 4
OnCompleted on threadId:4
... wie Sie es sehen können, nicht in die Reihenfolge der OnNext() Aufrufe.
Dies ist besonders wichtig bei der Verwendung von Typen mit einer Bedeutung wie erstellen und dann danach mehrere Updates ... Was tun, wenn das Update vor dem Erstellen ist? Wenn die Bestellung nicht garantiert ist, haben Sie möglicherweise ein Problem oder müssen die "zukünftigen" Ereignisse in eine Warteschlange stellen, bis der Vorgänger mit dem zu ändernden Status synchronisiert ist. Sie benötigen etwas wie eine steigende Version/Bestellnummer, um dies als Bestellkriterium zu verwenden und "Löcher" zu finden und die Nachfolger in eine Warteschlange zu stellen, bis sie wieder in der Schlange stehen.
2. EDIT ... um mehr Nähe zu meinem Problem und raus aus dem Testfall-Theorie:
Ich möchte eine einfache Schnittstelle, die einfach mit RX-Filterfunktion verwenden:
public interface ICommandBus // or to say Aggregator pattern
{
void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command
IObservable<T> Stream<T>() where T : ICommand;
}
public class CommandBus : ICommandBus, IDisposable
{
private static readonly ILog Log = LogManager.GetLogger<CommandBus>();
private readonly HashSet<Type> registrations = new HashSet<Type>();
private readonly Subject<ICommand> stream = new Subject<ICommand>();
private readonly IObservable<ICommand> notifications;
private bool disposed;
public CommandBus()
{
// hmm, this is a problem!? how to sync?
this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default);
}
public IObservable<T> Stream<T>() where T : ICommand
{
var observable = this.notifications.OfType<T>();
return new ExclusiveObservableWrapper<T>(
observable,
t => this.registrations.Add(t),
t => this.registrations.Remove(t));
}
public void Send<T>(T command) where T : ICommand
{
if (command == null)
{
throw new ArgumentNullException("command");
}
if (!this.registrations.Contains(typeof(T)))
{
throw new NoCommandHandlerSubscribedException();
}
Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name));
this.stream.OnNext(command);
}
//public async Task SendAsync<T>(T command) where T : ICommand
//{
// if (command == null)
// {
// throw new ArgumentNullException("command");
// }
// if (!this.registrations.Contains(typeof(T)))
// {
// throw new NoCommandHandlerSubscribedException();
// }
// Log.Debug(logm => logm("Sending command of type {0}.", typeof(T)));
// this.stream.OnNext(command);
// await this.stream.Where(item => ReferenceEquals(item, command));
//}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.stream.Dispose();
}
}
this.disposed = true;
}
[Serializable]
public class CommandAlreadySubscribedException : Exception
{
internal CommandAlreadySubscribedException(Type type)
: base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type))
{
}
protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
[Serializable]
public class NoCommandHandlerSubscribedException : Exception
{
public NoCommandHandlerSubscribedException()
{
}
public NoCommandHandlerSubscribedException(string message)
: base(message)
{
}
public NoCommandHandlerSubscribedException(string message, Exception innerException)
: base(message, innerException)
{
}
protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand
{
private readonly IObservable<T> observable;
private readonly Func<Type, bool> register;
private readonly Action<Type> unregister;
internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister)
{
this.observable = observable;
this.register = register;
this.unregister = unregister;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = this.observable.Subscribe(observer);
var type = typeof(T);
if (!this.register(type))
{
observer.OnError(new CommandAlreadySubscribedException(type));
}
return Disposable.Create(
() =>
{
subscription.Dispose();
this.unregister(type);
});
}
}
}
Wenn ich nicht garantieren kann, dass die Befehle in der Reihenfolge (wie angegeben) sind, dann haben sie (möglicherweise) keinen Sinn. (update before create)
Der ICommandBus wird vom UI/Presentation-Layer verwendet, der den entsprechenden Handler für den Befehl aufrufen möchte (ohne den Handler zu kennen).
Ich möchte einfach die Kette zu einem separaten Thread entladen.
Befehl -> Bus -> Befehl Handler -> Domain Model -> Ereignis -> Event-Handler -> liest Modell
Diese Befehle in der Reihenfolge des Erscheinens zu halten braucht.
Ich dachte RX ist in der Lage das mit nur ein paar "magischen Linien" zu machen. Aber soweit ich das jetzt sehen kann, muss ich es nochmal mit eigener Threadbehandlung machen. :-(
Ich bin neu in RX. Ich hoffte, dass es eine Möglichkeit gibt, es für das Offloading zu verwenden, aber die Serie auch bei mehreren Abonnenten beibehalten wird. – Beachwalker
Nicht, wenn Sie einen Scheduler verwenden, der Nebenläufigkeit ermöglicht. Versuchen Sie stattdessen "EventLoopScheduler". – Enigmativity