Ich habe mehrere Dienste, die im Wesentlichen Konsolenanwendungen sind, die mit TopShelf gehostet werden, und mit Rebus 0.99.50 kommunizieren. Einer dieser Dienste (StepManager) durchläuft eine Sammlung von Objekten (vom Typ Step), von denen jede eine Bus-Instanz enthält, mit der sie eine Nachricht sendet, und einen Handler, der eine Antwort verarbeitet. Der folgende Schritt (e) für dieses Beispiel verwendet, in dieser Reihenfolge, ist:Probleme, die aus nachfolgenden Rebus-Instanzen publizieren
- ReceiveFile
- LogFileMetrics
- ArchiveIncomingFile
In meinem aktuellen Szenario, ich habe insgesamt 7 Schritt (s) ... Beim Durchlaufen dieser Schritte verhalten sich ReceiveFile und LogFileMetrics wie erwartet. Wenn ArchiveIncomingFile jedoch ausgeführt wird, wird .Send (req) aufgerufen, aber die Nachricht erreicht niemals ihr Ziel, sodass der Prozess auf die Antwort wartet das kommt nie zurück. Unabhängig von der Art des Step-Objekts oder der Reihenfolge der Objekte in der Liste geschieht dies konsistent bei der zweiten Instanz des Typs Step (die eine .Send (req) in der Run() -Methode ausführt) in der Liste. ABER, wenn ich die while (! Completed) {kommandiere Task.Delay (25) auskommentiere; } Anweisungen, die Nachrichten scheinen gesendet zu werden, jedoch ohne diese Anweisungen werden alle Schritte ohne spezifische Ausführungsreihenfolge ausgeführt, was ein Problem darstellt.
Warum passiert das? Was vermisse ich/mache ich hier falsch? Und gibt es eine bessere Alternative, um das zu erreichen, was ich versuche?
Hier sind die relevanten Teile der Klassen in Frage:
public class StepManager
{
...
public string ProcessName { get; set; }
public List<Step> Steps { get; set; }
public BuiltinHandlerActivator ServiceBus { get; set; }
...
public async Task Init()
{
...
Steps = new List<Step>();
var process = Db.Processes.Include("Steps")
.Where(p => p.Name == ProcessName)
.FirstOrDefault();
...
foreach (var s in process.Steps)
{
var step = container.Resolve<Step>(s.Name);
...
Steps.Add(step);
}
}
public async Task Run()
{
foreach (var step in Steps)
{
await step.Run();
}
}
}
public class Step
{
public BuiltinHandlerActivator ServiceBus { get; set; }
public Step()
{
Db = new ClearStoneConfigContext();
Timer = new Stopwatch();
StepId = Guid.NewGuid().ToString();
Completed = false;
}
public virtual async Task Run() { }
}
public class ReceiveFile : Step
{
public ReceiveFile()
{
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<ProcessLog>("stepmanager"))
.Transport(t => t.UseMsmq("receivefile"))
.Start();
}
public override async Task Run()
{
...
LogEntry.Message = "File " + FileEvent.Name + " received.";
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}
public class LogFileMetrics : Step
{
public LogFileMetrics()
{
SubscriptionTable = "SandboxServiceBusSubscriptions";
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<LogFileMetricsRequest>("metrics"))
.Transport(t => t.UseMsmq("logfilemetrics"))
.Start();
ServiceBus.Handle<FileMetricsLogged>(async msg=> await FileMetricsLogged(msg));;
}
public override async Task Run()
{
...
await ServiceBus.Bus.Send(new LogFileMetricsRequest { ProcessId = ProcessId, FileEvent = FileEvent }).ConfigureAwait(false);
while (!Completed) { await Task.Delay(25); }
}
private async Task FileMetricsLogged(FileMetricsLogged msg)
{
...
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}
public class ArchiveIncomingFile : Step
{
public ArchiveIncomingFile()
{
SubscriptionTable = "SandboxServiceBusSubscriptions";
ServiceBus = new BuiltinHandlerActivator();
Configure.With(ServiceBus)
.Logging(l => l.ColoredConsole(LogLevel.Info))
.Routing(r => r.TypeBased().Map<ArchiveIncomingFileRequest>("incomingarchivefilerouter"))
.Transport(t => t.UseMsmq("archiveincomingfile"))
.Start();
ServiceBus.Handle<IncomingFileArchived>(async msg => await IncomingFileArchived(msg));
}
public override async Task Run()
{
...
ServiceBus.Bus.Send(req);
while (!Completed) { await Task.Delay(25); }
}
private async Task IncomingFileArchived(IncomingFileArchived msg)
{
...
await ServiceBus.Bus.Advanced.Routing.Send("stepmanager", LogEntry);
Completed = true;
}
}