Ich habe ein System eingerichtet, um Nachrichten aus einer Warteschlange auf unbestimmte Zeit zu lesen, und dann farmt sie mit Rx und TPL DataFlow
.Kontinuierlicher Datenstrom mit DataFlow und RX stoppt die Verarbeitung
Aus irgendeinem Grund, nach ein paar hundert Nachrichten stoppt der ActionBlock läuft und ich kann nicht herausfinden, warum. this.GetMessages()
feuert weiter, aber this.ProcessMessages
nicht mehr.
var source = Observable
.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.SelectMany(x => this.GetMessages());
var actionBlock = new ActionBlock<List<QueueStream>>(
this.ProcessMessages,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
});
using (source.Subscribe(actionBlock.AsObserver()))
{
while (this.Run)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
actionBlock.Complete();
await actionBlock.Completion;
Der Leser - Beachten Sie, dass dies tatsächlich
private async Task<List<QueueStream>> GetMessages()
{
var messageList = new List<QueueStream>();
var taskList = new List<Task>();
// Add up to N items in our queue
for (var i = 0; i < 25; i++)
{
var task = this
.ReadAndParseQueue()
.ContinueWith(async queueStreamTask =>
{
var queueStream = await queueStreamTask;
if (queueStream != null)
{
messageList.Add(queueStream);
}
});
taskList.Add(task);
}
await Task.WhenAll(taskList);
return messageList;
}
Der Schreiber läuft weiter - nach ein paar hundert Nachrichten hört diese getroffen zu
private async Task ProcessMessages(List<QueueStream> streams)
{
var tasks = new List<Task>();
foreach (var queueStream in streams)
{
tasks.Add(this.ProcessMessage(queueStream));
}
await Task.WhenAll(tasks);
}
Sie müssen wirklich eine [mcve] bereitstellen. Etwas, das wir Code kopieren, einfügen und ausführen können, der dieses Problem reproduziert. – Enigmativity
@Chris was ist der Status des 'ActionBlock' wenn er einmal _hangs_ ist, ist es wahrscheinlich, dass er fehlerhaft ist. Jede Exception, die innerhalb von 'ProcessMessages' ausgelöst wird, wird erst dann beobachtet, wenn Sie' complete' erwarten, was in Ihrem Fall niemals passieren würde, wenn 'this.run' immer gesetzt ist. – JSteward