2016-07-17 8 views
0

Wie kann ich die Verarbeitung von DataFlow-Blöcken stoppen, wenn einer der Blöcke die Entscheidung getroffen hat, dass ein Fehler aufgetreten ist, um zu verhindern, dass die nächsten Blöcke ausgeführt werden. Ich dachte, ein Block kann eine Ausnahme auslösen, aber nicht sicher, was der richtige Weg ist, um die weitere Verarbeitungspipeline zu stoppen.Wie kann die Pipeline bei fehlerhaften Blöcken gestoppt werden?

UPDATE:

private async void buttonDataFlow_Click(object sender, EventArgs e) 
{ 
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList(); 
    if (cells == null) 
     return; 

    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare), 
    new ExecutionDataflowBlockOptions 
    { 
     BoundedCapacity = 10000, 
     MaxDegreeOfParallelism = Environment.ProcessorCount, 
    }); 

    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback), 
    new ExecutionDataflowBlockOptions 
    { 
     BoundedCapacity = 10000, 
     MaxDegreeOfParallelism = Environment.ProcessorCount, 
    }); 

    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover), 
    new ExecutionDataflowBlockOptions 
    { 
     BoundedCapacity = 10000, 
     MaxDegreeOfParallelism = Environment.ProcessorCount, 
    }); 

    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true }); 
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true }); 

    foreach (Cell c in cells) 
    { 
     var progressHandler = new Progress<string>(value => 
     { 
      c.Status = value; 
     }); 

     c.Progress = progressHandler as IProgress<string>; 
     blockPrepare.Post(c); 
    }; 

    blockPrepare.Complete(); 
    try 
    { 
     await blockTestMover.Completion; 
    } 
    catch(Exception ee) 
    { 
     Console.WriteLine(ee.Message); 
    } 

    Console.WriteLine("Done"); 
} 

UPDATE 2:

public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
        Func<TInput, Task> action, 
        Action<Exception> exceptionHandler, 
        ExecutionDataflowBlockOptions dataflowBlockOptions) 
    { 
     return new ActionBlock<TInput>(async input => 
     { 
      try 
      { 
       await action(input); 
      } 
      catch (Exception ex) 
      { 
       exceptionHandler(ex); 
      } 
     }, dataflowBlockOptions); 
    } 

Antwort

3

Wenn das, was Sie wollen, dass eine Ausnahme in einem Block die aktuellen Produkte bedeutet in der Pipeline geht weiter, aber die Verarbeitung anderer Gegenstände sollten ohne Unterbrechung fortgesetzt werden, dann können Sie tun das durch Erstellen eines Blocks, der ein Element erzeugt, wenn die Verarbeitung erfolgreich ist, aber 0 Elemente erzeugt, wenn eine Ausnahme ausgelöst wird:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform, 
    Action<Exception> exceptionHandler, 
    ExecutionDataflowBlockOptions dataflowBlockOptions) 
{ 
    return new TransformManyBlock<TInput, TOutput>(async input => 
    { 
     try 
     { 
      var result = await transform(input); 
      return new[] { result }; 
     } 
     catch (Exception ex) 
     { 
      exceptionHandler(ex); 

      return Enumerable.Empty<TOutput>(); 
     } 
    }, dataflowBlockOptions); 
} 
+0

Das war genau das, wonach ich suchte! – Pablo

+0

nur eine letzte Sache .. Obwohl ActionBlock der letzte Block ist, habe ich versucht, eine ähnliche Methode zu erstellen, dafür zu machen. Mein Beitrag wurde mit dem Versuch aktualisiert, eine solche Methode zu erstellen. Aber Compiler beklagt sich, dass "Leere" nicht abwarten kann. Ich kann ihn verstehen, aber nicht sicher, wie ich es lösen soll. – Pablo

+0

Die Aktion, die ausgeführt wird, gibt 'Task' zurück. – Pablo

1

Wenn Sie eine Pipeline haben, sind Sie wahrscheinlich bereits PropagateCompletion = true verwenden. Das heißt, wenn ein Block in der Pipeline mit einer Ausnahme ausfällt, werden alle nachfolgenden Blöcke ebenfalls fehlschlagen.

Was bleibt, ist, alle Blöcke zu stoppen, die vor dem Block liegen, der fehlgeschlagen ist. Um dies zu tun, können Sie auf den Completion des letzten Blocks in der Pipeline warten. Wenn dies der Fall ist, scheitern Sie den ersten Block, indem Sie darauf aufrufen. Der Code könnte wie folgt aussehen:

// set up your pipeline 

try 
{ 
    await lastBlock.Completion; 
} 
catch (Exception ex) 
{ 
    ((IDataflowBlock)firstBlock).Fault(ex); 

    throw; // or whatever is appropriate to propagate the exception up 
} 
+0

Dies ist alles abbrechen, auch die Jobs, die nicht fehlerhaft sind. Ich meine, ich schreibe die Anzahl der Jobs, die in der foreach-Schleife pipeliert werden, und alle wurden abgebrochen, wenn einer von ihnen fehlerhaft ist. Die ersten 8 Gegenstände (8, weil ich 8 Kern-CPU habe) laufen, einer von ihnen geht nicht weiter und der Rest wird vervollständigt. Aber abgesehen von diesen 8 Artikeln werden andere nicht mehr verarbeitet. Wenn keine Ausnahme auftritt, werden alle Elemente verarbeitet. Ich habe den Quellcode in meinem Post aktualisiert. – Pablo

+0

@Pablo Ich dachte, das ist, was Sie wollten, "um die weitere Verarbeitung [in der] Pipeline zu stoppen". – svick

+0

Ich dachte, Pipeline ist, wenn ich einen Job mit einigen Eingabedaten blockieren, dann wird der Job zu einem anderen Block weitergeleitet und so weiter, nach verknüpften Blöcken. Wenn ich nur einmal poste, dann wird die Kette der Blöcke wie erwartet abgebrochen. Aber wenn ich mehrfach poste, dann werden alle "Posts" abgebrochen, auch solche, die ohne Ausnahme laufen. Mit anderen Worten, wenn ich Eingabedaten [1, 2, 3, 4] zu einer Kette von Blöcken posten möchte und wenn nur, wenn 2 nach der Ausnahme kommt, dann 1, 3, 4, um den Job wie erwartet zu beenden. Ich habe wahrscheinlich falsche Begriffe benutzt, aber ich bin mir nicht sicher, wie ich besser beschreiben soll. – Pablo

Verwandte Themen