2017-04-20 1 views
1

Ich habe ein System eingerichtet, um Nachrichten aus einer Warteschlange auf unbestimmte Zeit zu lesen, und dann farmt sie mit Rx und TPL DataFlow.Kontinuierlicher Datenstrom mit DataFlow und RX stoppt die Verarbeitung

Aus irgendeinem Grund, nach ein paar hundert Nachrichten stoppt der ActionBlock läuft und ich kann nicht herausfinden, warum. this.GetMessages() feuert weiter, aber this.ProcessMessages nicht mehr.

var source = Observable 
    .Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)) 
    .SelectMany(x => this.GetMessages()); 

var actionBlock = new ActionBlock<List<QueueStream>>(
    this.ProcessMessages, 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = Environment.ProcessorCount * 2, 
    }); 

using (source.Subscribe(actionBlock.AsObserver())) 
{ 
    while (this.Run) 
    { 
     await Task.Delay(TimeSpan.FromSeconds(1)); 
    } 
} 

actionBlock.Complete(); 
await actionBlock.Completion; 

Der Leser - Beachten Sie, dass dies tatsächlich

private async Task<List<QueueStream>> GetMessages() 
{ 
    var messageList = new List<QueueStream>(); 
    var taskList = new List<Task>(); 

    // Add up to N items in our queue 
    for (var i = 0; i < 25; i++) 
    { 
     var task = this 
      .ReadAndParseQueue() 
      .ContinueWith(async queueStreamTask => 
       { 
        var queueStream = await queueStreamTask; 
        if (queueStream != null) 
        { 
         messageList.Add(queueStream); 
        } 
       }); 

     taskList.Add(task); 
    } 

    await Task.WhenAll(taskList); 

    return messageList; 
} 

Der Schreiber läuft weiter - nach ein paar hundert Nachrichten hört diese getroffen zu

private async Task ProcessMessages(List<QueueStream> streams) 
{ 
    var tasks = new List<Task>(); 
    foreach (var queueStream in streams) 
    { 
     tasks.Add(this.ProcessMessage(queueStream)); 
    } 

    await Task.WhenAll(tasks); 
} 
+0

Sie müssen wirklich eine [mcve] bereitstellen. Etwas, das wir Code kopieren, einfügen und ausführen können, der dieses Problem reproduziert. – Enigmativity

+0

@Chris was ist der Status des 'ActionBlock' wenn er einmal _hangs_ ist, ist es wahrscheinlich, dass er fehlerhaft ist. Jede Exception, die innerhalb von 'ProcessMessages' ausgelöst wird, wird erst dann beobachtet, wenn Sie' complete' erwarten, was in Ihrem Fall niemals passieren würde, wenn 'this.run' immer gesetzt ist. – JSteward

Antwort

1

Sind Sie sicher, dass Ihr source in diesem Fall weiterläuft? Es gibt eine unendliche Schleife im Code, aber, wenn Fehler auftreten oder this.Run ist nicht gesetzt, hört es, und danach haben Sie die folgenden Zeilen:

actionBlock.Complete(); 
await actionBlock.Completion; 

die tatsächlich die actionBlock verhindert, dass neuen mesasges zu akzeptieren, so ProcessMessages wird nie aufgerufen, da Nachrichten einfach ignoriert werden.

Verwandte Themen