2016-06-27 6 views
1

Ich habe zwei einfache beobachtende Handler mit einem Abonnement für die gleiche Quelle. Beide Abonnements arbeiten jedoch auf verschiedenen Arten. Ich möchte, dass sie die Reihenfolge der beobachtbaren Quelle (Subject()) beibehalten. Ich habe es mit der Synchronize() Erweiterung versucht, aber ich habe keine Möglichkeit gefunden, diese Arbeit wie erwartet zu bekommen.Wie synchronisiert man Observables und Offloading UI Thread

Hier ist meine Einheit Test-Code:

[Test] 
public void TestObserveOn() 
{ 
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
    var source = new Subject<object>(); 
    var are = new AutoResetEvent(false); 

    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
     o => 
      { 
       Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
       int sleep = 3000/o; // just to simulate longer processing 
       Thread.Sleep(sleep); 
       Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
      }, 
     () => 
      { 
       Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
       are.Set(); 
      })) 
    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
        o => 
        { 
         Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
         Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
        }, 
        () => 
        { 
         Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
        })) 
    { 
     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     source.OnNext(1); 
     source.OnNext(1.1); 
     source.OnNext(2); 
     source.OnNext(2.1); 
     source.OnNext(3); 
     source.OnNext(3.1); 
     source.OnCompleted(); 

     Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     are.WaitOne(); 
    } 
} 

resultierende Ausgabe des Testcode:

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:11 
Handled 1 on threadId: 11 
Received 1,1 on threadId:12 
Handled 1,1 on threadId: 12 
Received 2,1 on threadId:12 
Handled 2,1 on threadId: 12 
Received 3,1 on threadId:12 
Handled 3,1 on threadId: 12 
Received 2 on threadId:11 
Handled 2 on threadId: 11 
OnCompleted on threadId:12 
Received 3 on threadId:11 
Handled 3 on threadId: 11 
OnCompleted on threadId:11 

Wie Sie die Bestellung an den Eingang anders sehen kann. Ich möchte beide Abonnements synchronisieren, so dass die Bestellung die gleiche wie für die Eingabe ist.

sollte die Ausgabe

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:11 
Handled 1 on threadId: 11 
Received 1,1 on threadId:12 
Handled 1,1 on threadId: 12 
Received 2 on threadId:11 
Handled 2 on threadId: 11 
Received 2,1 on threadId:12 
Handled 2,1 on threadId: 12 
Received 3 on threadId:11 
Handled 3 on threadId: 11 
Received 3,1 on threadId:12 
Handled 3,1 on threadId: 12 
OnCompleted on threadId:11 
OnCompleted on threadId:12 

(Completion Ordnung für mich nicht so wichtig ist) sein.

EDIT:

Ich habe auch versucht die folgenden:

[Test] 
public void TestObserveOn() 
{ 
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
    var source = new Subject<object>(); 
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); 
    var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler); 
    var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory); 
    var are = new AutoResetEvent(false); 

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
     o => 
      { 
       Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
       int sleep = 3000/o; 
       Thread.Sleep(sleep); 
       Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
      }, 
     () => 
      { 
       Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
       are.Set(); 
      })) 
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
        o => 
        { 
         Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
         Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
        }, 
        () => 
        { 
         Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
         are.Set(); 
        })) 
    { 
     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     source.OnNext(1); 
     source.OnNext(1.1); 
     source.OnNext(2); 
     source.OnNext(2.1); 
     source.OnNext(3); 
     source.OnNext(3.1); 
     source.OnCompleted(); 

     Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     are.WaitOne(); 
     are.WaitOne(); 
    } 
} 

Aber der Ausgang ist immer noch falsch:

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:4 
Handled 1 on threadId: 4 
Received 2 on threadId:4 
Handled 2 on threadId: 4 
Received 3 on threadId:4 
Handled 3 on threadId: 4 
OnCompleted on threadId:4 
Received 1,1 on threadId:4 
Handled 1,1 on threadId: 4 
Received 2,1 on threadId:4 
Handled 2,1 on threadId: 4 
Received 3,1 on threadId:4 
Handled 3,1 on threadId: 4 
OnCompleted on threadId:4 

... wie Sie es sehen können, nicht in die Reihenfolge der OnNext() Aufrufe.

Dies ist besonders wichtig bei der Verwendung von Typen mit einer Bedeutung wie erstellen und dann danach mehrere Updates ... Was tun, wenn das Update vor dem Erstellen ist? Wenn die Bestellung nicht garantiert ist, haben Sie möglicherweise ein Problem oder müssen die "zukünftigen" Ereignisse in eine Warteschlange stellen, bis der Vorgänger mit dem zu ändernden Status synchronisiert ist. Sie benötigen etwas wie eine steigende Version/Bestellnummer, um dies als Bestellkriterium zu verwenden und "Löcher" zu finden und die Nachfolger in eine Warteschlange zu stellen, bis sie wieder in der Schlange stehen.

2. EDIT ... um mehr Nähe zu meinem Problem und raus aus dem Testfall-Theorie:

Ich möchte eine einfache Schnittstelle, die einfach mit RX-Filterfunktion verwenden:

public interface ICommandBus // or to say Aggregator pattern 
{ 
    void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command 

    IObservable<T> Stream<T>() where T : ICommand; 
} 

public class CommandBus : ICommandBus, IDisposable 
{ 
    private static readonly ILog Log = LogManager.GetLogger<CommandBus>(); 

    private readonly HashSet<Type> registrations = new HashSet<Type>(); 

    private readonly Subject<ICommand> stream = new Subject<ICommand>(); 

    private readonly IObservable<ICommand> notifications; 

    private bool disposed; 

    public CommandBus() 
    { 
     // hmm, this is a problem!? how to sync? 
     this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default); 

    } 

    public IObservable<T> Stream<T>() where T : ICommand 
    { 
     var observable = this.notifications.OfType<T>(); 
     return new ExclusiveObservableWrapper<T>(
      observable, 
      t => this.registrations.Add(t), 
      t => this.registrations.Remove(t)); 
    } 

    public void Send<T>(T command) where T : ICommand 
    { 
     if (command == null) 
     { 
      throw new ArgumentNullException("command"); 
     } 

     if (!this.registrations.Contains(typeof(T))) 
     { 
      throw new NoCommandHandlerSubscribedException(); 
     } 

     Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name)); 

     this.stream.OnNext(command); 
    } 

    //public async Task SendAsync<T>(T command) where T : ICommand 
    //{ 
    // if (command == null) 
    // { 
    //  throw new ArgumentNullException("command"); 
    // } 

    // if (!this.registrations.Contains(typeof(T))) 
    // { 
    //  throw new NoCommandHandlerSubscribedException(); 
    // } 

    // Log.Debug(logm => logm("Sending command of type {0}.", typeof(T))); 

    // this.stream.OnNext(command); 

    // await this.stream.Where(item => ReferenceEquals(item, command)); 
    //} 

    public void Dispose() 
    { 
     this.Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    protected virtual void Dispose(bool disposing) 
    { 
     if (!this.disposed) 
     { 
      if (disposing) 
      { 
       this.stream.Dispose(); 
      } 
     } 

     this.disposed = true; 
    } 

    [Serializable] 
    public class CommandAlreadySubscribedException : Exception 
    { 
     internal CommandAlreadySubscribedException(Type type) 
      : base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type)) 
     { 
     } 

     protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context) 
      : base(info, context) 
     { 
     } 
    } 

    [Serializable] 
    public class NoCommandHandlerSubscribedException : Exception 
    { 
     public NoCommandHandlerSubscribedException() 
     { 
     } 

     public NoCommandHandlerSubscribedException(string message) 
      : base(message) 
     { 
     } 

     public NoCommandHandlerSubscribedException(string message, Exception innerException) 
      : base(message, innerException) 
     { 
     } 

     protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context) 
      : base(info, context) 
     { 
     } 
    } 

    private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand 
    { 
     private readonly IObservable<T> observable; 

     private readonly Func<Type, bool> register; 

     private readonly Action<Type> unregister; 

     internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister) 
     { 
      this.observable = observable; 
      this.register = register; 
      this.unregister = unregister; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var subscription = this.observable.Subscribe(observer); 
      var type = typeof(T); 

      if (!this.register(type)) 
      { 
       observer.OnError(new CommandAlreadySubscribedException(type)); 
      } 

      return Disposable.Create(
       () => 
       { 
        subscription.Dispose(); 
        this.unregister(type); 
       }); 
     } 
    } 
} 

Wenn ich nicht garantieren kann, dass die Befehle in der Reihenfolge (wie angegeben) sind, dann haben sie (möglicherweise) keinen Sinn. (update before create)

Der ICommandBus wird vom UI/Presentation-Layer verwendet, der den entsprechenden Handler für den Befehl aufrufen möchte (ohne den Handler zu kennen).

Ich möchte einfach die Kette zu einem separaten Thread entladen.

Befehl -> Bus -> Befehl Handler -> Domain Model -> Ereignis -> Event-Handler -> liest Modell

Diese Befehle in der Reihenfolge des Erscheinens zu halten braucht.

Ich dachte RX ist in der Lage das mit nur ein paar "magischen Linien" zu machen. Aber soweit ich das jetzt sehen kann, muss ich es nochmal mit eigener Threadbehandlung machen. :-(

+0

Ich bin neu in RX. Ich hoffte, dass es eine Möglichkeit gibt, es für das Offloading zu verwenden, aber die Serie auch bei mehreren Abonnenten beibehalten wird. – Beachwalker

+0

Nicht, wenn Sie einen Scheduler verwenden, der Nebenläufigkeit ermöglicht. Versuchen Sie stattdessen "EventLoopScheduler". – Enigmativity

Antwort

2

Sie scheinen falsch zu verstehen, was .Synchronize() tut. Sein einziger Zweck besteht darin, ein Observable zu nehmen, das überlappende oder veraltete Nachrichten erzeugt (d. H. A OnCompleted vor einem OnNext oder multiple OnError) und sicherzustellen, dass sie dem Verhaltensvertrag OnNext*(OnError|OnCompleted) folgen. Es geht darum, ein Rogue beobachtbares Spiel schön zu machen.

Jetzt, da wir das ignorieren können, da Ihre Beispieleingabe eine gut benannte beobachtbare ist, dann können Sie sehen, indem Sie .ObserveOn(TaskPoolScheduler.Default) anrufen, machen Sie Ihre beobachtbaren Sprungfäden - die leicht zu Observablen führen können, die mit verschiedenen Raten verbraucht werden - was hier passiert.

Sie haben die source zweimal abonniert, so dass Sie das Verhalten nicht stoppen können, das Sie angesichts der Art sehen, wie Sie Nebenläufigkeit einführen.

Angesichts Ihrer vorherigen Frage (How to await finished IObserver call including observing subscriber calls?) scheinen Sie höllisch auf Rx zu verwenden, um Nebenläufigkeit hinzuzufügen, aber dann zwingen Sie es irgendwie, es zu entfernen. Du solltest wirklich in der Denkweise sein, Rx zu entfesseln, um sein Ding zu tun und es nicht zu verschlingen.

EDIT von @Beachwalker:

Enigmativity gab die richtige Antwort auf meine Frage in seinem Kommentar zu dieser Antwort.

Ich muss die EventLoopScheduler verwenden. Also akzeptiere ich das als richtige Antwort.

Der Vollständigkeit halber.Hier ist der Code, der funktioniert:

[Test] 
public void TestObserveOn() 
{ 
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
    var source = new Subject<object>(); 
    var exclusiveScheduler = new EventLoopScheduler(); 
    var are = new AutoResetEvent(false); 

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
     o => 
      { 
       Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
       int sleep = 3000/o; 
       Thread.Sleep(sleep); 
       Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
      }, 
     () => 
      { 
       Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
       are.Set(); 
      })) 
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
        o => 
         { 
          Console.WriteLine(
           "Received {1} on threadId:{0}", 
           Thread.CurrentThread.ManagedThreadId, 
           o); 
          Console.WriteLine(
           "Handled {1} on threadId: {0}", 
           Thread.CurrentThread.ManagedThreadId, 
           o); 
         }, 
        () => 
        { 
         Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
         are.Set(); 
        })) 
    { 
     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     source.OnNext(1); 
     source.OnNext(1.1); 
     source.OnNext(2); 
     source.OnNext(2.1); 
     source.OnNext(3); 
     source.OnNext(3.1); 
     source.OnCompleted(); 

     Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     are.WaitOne(); 
     are.WaitOne(); 
    } 
} 
+0

Also bin ich gezwungen, meine eigene Thread-Verwaltung um RX zu legen, um von der UI-Aufgabe zu entladen? Hmm, enttäuschend. Ich dachte, RX könnte auch in diesem typischen Fall helfen. Es scheint, @VMAtm ist nah an einer Lösung. Ich möchte das Ganze nur auf einen anderen Thread verschieben. Ich kann nicht glauben, dass es keine Lösung für dieses Problem gibt (was in "Old School Coding" einfach ist, aber zu viel Code aus meiner Sicht benötigt). – Beachwalker

+0

@Beachwalker - Nein, überhaupt nicht. Rx eignet sich hervorragend zum Entfernen von Threading-Problemen. Ihr Beispielcode ist sehr abstrakt. Was versuchst du wirklich? – Enigmativity

+0

Danke für den Versuch zu helfen. Ich habe meine Frage präzisiert. – Beachwalker

1

Sie erstellen eine zwei unterschiedliche Aufgaben für Ihren source, basierend auf Filter nach Art der Quelle nächsten Mitglied.

Sie verarbeiten die Nachrichten parallel, wie Sie aus den Thread-IDs sehen können. Diese bietet Ihnen die bessere Leistung, aber bieten Ihnen keine Garantie für eine Reihenfolge der Handhabung der source. Also, wenn Sie eine sequentielle Handle für Ihre Objekte benötigen, müssen Sie entweder Ihren Code für eine sequenzielle Ausführung neu schreiben (was Ihre Leistung beeinträchtigen wird) oder um andere Scheduler für Testzwecke zu verwenden

Derzeit verwenden Sie TaskPoolScheduler.Default, die einfach den Standard-Thread-Pool verwenden. So können Sie a new scheduler bereitstellen. Sie können es eine neue Implementierung von selbst bereitstellen, aber ich denke, dass der einfachste Weg ist, ConcurrentExclusiveSchedulerPair Klasse zu verwenden, um einen exklusiven Scheduler zur Verfügung zu stellen, um Ihre source in der gleichen Reihenfolge, die Sie eine Werte bereitstellen.

Ihr Code könnte so etwas wie dieses:

var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); 
var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler); 
var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory); 
using (source.ObserveOn(exclusiveScheduler)... 

aktualisieren:

Wie in anderen Post gesagt, der richtige Weg, Ereignisse zu behandeln wie die EventLoopScheduler Klasse.

+0

Danke für Ihre Hilfe. Ich habe deinen Vorschlag ausprobiert ... aber es hat nicht wie erwartet funktioniert. Siehe meine Bearbeitung. – Beachwalker

+0

Oh, das ist unerwartet für mich. Es scheint also, dass Sie entweder zusätzliche Logik schreiben müssen, um die Beobachter zu synchronisieren. Haben Sie versucht, den Methodenaufruf 'Synchronize' hinzuzufügen? – VMAtm

+0

Ja, aber es scheint etwas nicht zu beeinflussen. @Enigmatistik sagte auch, Synchronize() ist nicht das richtige Werkzeug dafür. (Obwohl ich nicht weiß, warum Sie OnNext() und OnError() in einem beobachtbaren Stream neu anordnen sollten, wie er als Beispiel gab). – Beachwalker

Verwandte Themen