2016-05-05 5 views
1

Ich habe mehrere Dienste, die im Wesentlichen Konsolenanwendungen sind, die mit TopShelf gehostet werden, und mit Rebus 0.99.50 kommunizieren. Einer dieser Dienste (StepManager) durchläuft eine Sammlung von Objekten (vom Typ Step), von denen jede eine Bus-Instanz enthält, mit der sie eine Nachricht sendet, und einen Handler, der eine Antwort verarbeitet. Der folgende Schritt (e) für dieses Beispiel verwendet, in dieser Reihenfolge, ist:Probleme, die aus nachfolgenden Rebus-Instanzen publizieren

  • ReceiveFile
  • LogFileMetrics
  • ArchiveIncomingFile

In meinem aktuellen Szenario, ich habe insgesamt 7 Schritt (s) ... Beim Durchlaufen dieser Schritte verhalten sich ReceiveFile und LogFileMetrics wie erwartet. Wenn ArchiveIncomingFile jedoch ausgeführt wird, wird .Send (req) aufgerufen, aber die Nachricht erreicht niemals ihr Ziel, sodass der Prozess auf die Antwort wartet das kommt nie zurück. Unabhängig von der Art des Step-Objekts oder der Reihenfolge der Objekte in der Liste geschieht dies konsistent bei der zweiten Instanz des Typs Step (die eine .Send (req) in der Run() -Methode ausführt) in der Liste. ABER, wenn ich die while (! Completed) {kommandiere Task.Delay (25) auskommentiere; } Anweisungen, die Nachrichten scheinen gesendet zu werden, jedoch ohne diese Anweisungen werden alle Schritte ohne spezifische Ausführungsreihenfolge ausgeführt, was ein Problem darstellt.

Warum passiert das? Was vermisse ich/mache ich hier falsch? Und gibt es eine bessere Alternative, um das zu erreichen, was ich versuche?

Hier sind die relevanten Teile der Klassen in Frage:

public class StepManager 
{ 
    ... 

    public string ProcessName { get; set; } 
    public List<Step> Steps { get; set; } 
    public BuiltinHandlerActivator ServiceBus { get; set; } 

    ... 

    public async Task Init() 
    { 
     ... 

     Steps = new List<Step>(); 
     var process = Db.Processes.Include("Steps") 
           .Where(p => p.Name == ProcessName) 
           .FirstOrDefault(); 
     ... 

     foreach (var s in process.Steps) 
     { 
      var step = container.Resolve<Step>(s.Name); 

      ... 

      Steps.Add(step); 
     }  
    } 

    public async Task Run() 
    { 
     foreach (var step in Steps) 
     { 
      await step.Run(); 
     } 
    } 
} 

public class Step 
{ 
    public BuiltinHandlerActivator ServiceBus { get; set; } 

    public Step() 
    { 
     Db = new ClearStoneConfigContext(); 
     Timer = new Stopwatch(); 
     StepId = Guid.NewGuid().ToString(); 

     Completed = false; 
    } 

    public virtual async Task Run() { } 
} 

public class ReceiveFile : Step 
{ 
    public ReceiveFile() 
    { 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<ProcessLog>("stepmanager"))      
       .Transport(t => t.UseMsmq("receivefile")) 
       .Start(); 
    } 

    public override async Task Run() 
    { 
     ... 

     LogEntry.Message = "File " + FileEvent.Name + " received.";  
     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true;   
    } 
} 

public class LogFileMetrics : Step 
{ 
    public LogFileMetrics() 
    { 
     SubscriptionTable = "SandboxServiceBusSubscriptions"; 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<LogFileMetricsRequest>("metrics"))     
       .Transport(t => t.UseMsmq("logfilemetrics")) 
       .Start(); 

     ServiceBus.Handle<FileMetricsLogged>(async msg=> await FileMetricsLogged(msg));; 
    } 

    public override async Task Run() 
    { 
     ... 

     await ServiceBus.Bus.Send(new LogFileMetricsRequest { ProcessId = ProcessId, FileEvent = FileEvent }).ConfigureAwait(false); 

     while (!Completed) { await Task.Delay(25); }   
    } 

    private async Task FileMetricsLogged(FileMetricsLogged msg) 
    { 
     ... 

     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true; 
    } 
} 

public class ArchiveIncomingFile : Step 
{ 
    public ArchiveIncomingFile() 
    { 
     SubscriptionTable = "SandboxServiceBusSubscriptions"; 
     ServiceBus = new BuiltinHandlerActivator(); 

     Configure.With(ServiceBus) 
       .Logging(l => l.ColoredConsole(LogLevel.Info))      
       .Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))      
       .Transport(t => t.UseMsmq("archiveincomingfile"))      
       .Start(); 

     ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg)); 
    } 

    public override async Task Run() 
    { 
     ... 

     ServiceBus.Bus.Send(req); 

     while (!Completed) { await Task.Delay(25); } 
    } 

    private async Task IncomingFileArchived(IncomingFileArchived msg) 
    { 
     ... 

     await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry); 
     Completed = true; 
    } 
} 

Antwort

0

ich einige Probleme mit Ihrem Code sehen kann, obwohl es mir nicht klar ist, was das lustige Verhalten Sie erleben verursacht.

Zunächst scheint es, als ob Sie jedes Mal, wenn Sie Schritte erstellen, neue Businstanzen erstellen. Ist Ihnen bewusst, dass die Businstanz von Rebus beim Start in Ihrer Anwendung einmalig erstellt werden sollte, als Singleton aufbewahrt und ordnungsgemäß entsorgt werden muss, wenn Ihre Anwendung heruntergefahren wird?

Sie können diesen Zyklus so oft wie Sie möchten ausführen, es ist nicht so, als würde Rebus irgendetwas zurücklassen, aber die Tatsache, dass Sie den Bus nirgendwohin bringen, sagt mir, dass Ihre Anwendung wahrscheinlich vergisst dies zu tun.

Sie können mehr über das Rebus-Wiki lesen, besonders im Abschnitt über Rebus' bus instance.

Ein weiteres Problem ist die subtile potenzielle Racebedingung in der ArchiveIncomingFile Klasse, deren Ctor sieht wie folgt aus:

public ArchiveIncomingFile() 
{ 
    SubscriptionTable = "SandboxServiceBusSubscriptions"; 
    ServiceBus = new BuiltinHandlerActivator(); 

    Configure.With(ServiceBus) 
      .Logging(l => l.ColoredConsole(LogLevel.Info))      
      .Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))      
      .Transport(t => t.UseMsmq("archiveincomingfile"))      
      .Start(); 

    //<<< bus is receiving messages at this point, but there's no handler!! 

    ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg)); 
} 

Wie Sie sehen können, gibt es eine (sehr sehr sehr kurz, zugegebenermaßen) Zeit (gekennzeichnet durch //<<<) in dem der Bus gestartet wurde (und somit beginnt, Nachrichten aus seiner Eingabewarteschlange zu ziehen), in denen noch keine Handler konfiguriert wurden.

Sie sollten sicherstellen, Handler zu konfigurieren, BEVOR Sie den Bus starten.

Schließlich Sie fragen

Und gibt es eine bessere Alternative zu erreichen, was ich tun möchte?

aber ich bin nicht in der Lage, diese Frage zu beantworten, weil ich einfach nicht herausfinden, was Sie zu tun versuchen;)

(aber wenn Sie auf einem etwas höheren Niveau mir erklären, welches Problem Sie versuchen, zu lösen, könnte ich einige Hinweise für Sie haben :))

Verwandte Themen