2016-11-22 6 views
4

.Net TPL Experten,Benachrichtigen Aufgabe, wenn andere Aufgaben vollständig

Hinweis: Kann nicht Dataflow-Bibliothek verwenden; keine Add-ons erlaubt.

Ich habe vier Aufgaben, wie unten in der Abbildung dargestellt:

enter image description here

  • Task_1 (data_producer) -> liest Datensätze aus einer großen Datei (> 500000 Datensätze) und fügt Datensätze in eine Blocking

  • task_2, task_3 (data_consumers) -> Jede dieser Aufgaben nimmt Datensätze aus der BlockingCollection. Jede Task führt einige Arbeiten an dem aus der BlockingCollection entnommenen Datensatz aus (netzwerkbezogen). Nach Abschluss kann jede Task der Ergebnisliste einen Datensatz hinzufügen. Reihenfolge der Verarbeitung ist NICHT wichtig.

  • task_4 (Ergebnisprozessor) -> Nimmt Datensätze aus result_queue und schreibt in eine Ausgabedatei.

ich dann warten, bis die Aufgaben zu erledigen, d.h .:

Task.WhenAll(t1, t2, t3, t4) 

So habe ich eine Produzenten Aufgabe, MEHR Verbraucher Aufgaben und eine Aufgabe für die Ergebnisse zu speichern.

Meine Frage ist:

Wie kann ich mitteilen Aufgabe 4, wenn Aufgaben 2 und 3 abgeschlossen sind, so dass auch Aufgabe 4 weiß, wann zu beenden?

Ich habe viele Beispiele gefunden, die Daten von EINER Aufgabe auf eine lineare "Pipeline" -Mode "bewegen", aber keine Beispiele gefunden haben, die das Obige illustrieren; das heißt, wie Task 4 benachrichtigt wird, wenn Task 2 und 3 abgeschlossen sind, so dass es weiß, wann auch abgeschlossen werden soll.

Mein erster Gedanke ist es, Aufgabe 2 und 3 mit Aufgabe 4 zu "registrieren" und einfach den Status jeder registrierten Aufgabe zu überwachen - wenn Aufgabe 2 und 3 nicht mehr laufen, dann kann Aufgabe 4 stoppen (wenn die Ergebnisse Warteschlange ist auch leer).

Vielen Dank im Voraus.

+0

Sie können das [NuGet Paket für 'TPL Dataflow'] (https://www.nuget.org/packages/Microsoft.Tpl.Dataflow) für Ihr Projekt nicht hinzufügen? – VMAtm

+0

Richtig - für dieses spezielle Projekt ist TPL Dataflow nicht erlaubt. – bdcoder

Antwort

0

Das ist ein bisschen eine Erweiterung auf was schon Thomas gesagt hat.

Mit einem BlockingCollection können Sie GetConsumingEnumerable() darauf aufrufen und es nur als eine normale foreach-Schleife behandeln. Dadurch können Ihre Aufgaben "natürlich" enden. Das einzige, was Sie tun müssen, ist eine zusätzliche Aufgabe hinzuzufügen, die die Aufgaben 2 und 3 überwacht, um zu sehen, wann sie enden, und das vollständige Hinzufügen dieser Aufgaben aufzurufen.

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>(); 
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>(); 

Task RunProcess() 
{ 
    Task1Start(); 
    var t2 = Stage2Start(); 
    var t3 = Stage2Start(); 
    Stage2MonitorStart(t2,t3); 
    retrun Task4Start(); 
} 

public void Task1Start() 
{ 
    Task.Run(()=> 
    { 
     foreach(var item in GetFileSource()) 
     { 
      var processedItem = Process(item); 
      _stageOneBlockingCollection.Add(processedItem); 
     } 
     _stageOneBlockingCollection.CompleteAdding(); 
    } 
} 

public Task Stage2Start() 
{ 
    return Task.Run(()=> 
    { 
     foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable()) 
     { 
      var processedItem = ProcessStage2(item); 
      _stageTwoBlockingCollection.Add(processedItem); 
     } 
    } 
} 

void Stage2MonitorStart(params Task[] tasks) 
{ 
    //Once all tasks complete mark the collection complete adding. 
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding()); 
} 

public Task Stage4Start() 
{ 
    return Task.Run(()=> 
    { 
     foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable()) 
     { 
      var processedItem = ProcessStage4(item); 
      WriteToOutputFile(processedItem); 
     } 
    } 
} 
+0

Dies könnte es tun ... Wird versuchen ... – bdcoder

+0

Das macht das genaue Verhalten. Alle Aufgaben werden gleichzeitig ausgeführt. Beachten Sie in allen Funktionen die Task.Run. Die Task, die von 'RunProcess' zurückgegeben wird, ist eine Task, die darstellt, wenn der gesamte Prozess abgeschlossen ist. –

+0

Yup - sieht aus wie dein der Gewinner - vielen Dank !! – bdcoder

0

Wenn Sie die BlockingCollection auch für results_queue verwenden, können Sie diese Benachrichtigungen mithilfe der Eigenschaften BlockingCollection.IsCompleted und BlockingCollection.IsAddingCompleted implementieren. Prozess ist:

  • task1 Anrufe Methode BlockingCollection.CompleteAdding() auf der Eingangserfassung, wenn es keine weiteren Einträge in der Eingabedatei ist.
  • Eigenschaft task2 und task3 check regulary IsCompleted für die Eingabesammlung. Diese Eigenschaft ist wahr, wenn die Eingabesammlung leer ist und der Producer CompleteAdding() -Methode genannt wird. Nachdem diese Eigenschaft erfüllt ist, sind die Aufgaben 2 und 3 abgeschlossen, und sie können die CompleteAdding() -Methode für die Ergebniswarteschlange aufrufen und ihre Arbeit beenden.
  • task4 kann Datensätze in der result_queue verarbeiten, wenn sie eintreffen, oder kann warten, bis die Eigenschaft IsAddingCompleted der Ergebniswarteschlange wahr wird und dann mit der Verarbeitung beginnt. Die Arbeit von task4 ist beendet, wenn die IsCompleted-Eigenschaft in der Ergebniswarteschlange wahr ist.

Edit: Ich bin nicht sicher, ob Sie mit dieser IsCompleted und IsAddingCompleted Eigenschaften vertraut sind. Sie sind anders und sind perfekt für Ihren Fall. Ich glaube nicht, dass Sie neben den BlockingCollection-Eigenschaften noch weitere Synchronisationselemente benötigen. Bitte fragen Sie nach, ob zusätzliche Erklärungen benötigt werden!

BlockingCollection<int> inputQueue; 
    BlockingCollection<int> resultQueue; 

    public void StartTasks() 
    { 
     inputQueue = new BlockingCollection<int>(); 
     resultQueue = new BlockingCollection<int>(); 

     Task task1 = Task.Run(() => Task1()); 
     Task task2 = Task.Run(() => Task2_3()); 
     Task task3 = Task.Run(() => Task2_3()); 
     Task[] tasksInTheMiddle = new Task[] { task2, task3 }; 
     Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding())); 
     Task task4 = Task.Run(() => Task4()); 

     //Waiting for tasks to finish 
    } 
    private void Task1() 
    { 
     while(true) 
     { 
      int? input = ReadFromInputFile(); 
      if (input != null) 
      { 
       inputQueue.Add((int)input); 
      } 
      else 
      { 
       inputQueue.CompleteAdding(); 
       break; 
      } 
     } 
    } 

    private void Task2_3() 
    { 
     while(inputQueue.IsCompleted) 
     { 
      int input = inputQueue.Take(); 
      resultQueue.Add(input); 
     } 
    } 

    private void Task4() 
    { 
     while(resultQueue.IsCompleted) 
     { 
      int result = resultQueue.Take(); 
      WriteToOutputFile(result); 
     } 
    } 
+0

Nicht klar, wie das obige funktionieren würde. Aufgabe 2 und Aufgabe 3 könnten noch Datensätze hinzufügen, selbst wenn das Ende der Eingabedatei erreicht ist. Was ich wirklich wissen muss, ist, wenn Task 2 und Task 3 beendet sind (bis zum Abschluss) - also habe ich überlegt, den Status dieser Aufgaben zu überwachen, um sicherzustellen, dass alle Ergebnisse gemacht werden. – bdcoder

+0

Ich werde meine Antwort zu einem klareren bearbeiten. – Thomas

+0

Ich habe jetzt den Kommentar von Scott Chamberlain gelesen. Das Beste sollte wahrscheinlich meine Lösung mit seiner kombinieren. Es hat keinen Sinn zu warten, bis die letzte task4-Verarbeitung für Aufgabe 2 und 3 abgeschlossen ist, wenn Sie das parallel tun können. Aber von seiner Lösung ist es besser, eine andere Aufgabe (Continue.WhenAll) zu verwenden, um CompleteAdding in der Ergebnis-Warteschlange zu setzen - so sind Sie wirklich sicher, wenn diese Aufgaben abgeschlossen sind. Zwischen task4 kann Take() -Methode für die Ergebnis-Warteschlange aufrufen und gleichzeitig zur Ausgabedatei hinzufügen (während Task2 und 3 noch schreiben). – Thomas

0

Die Aufgabe Sie beschreiben, gut für eine TPL in eine TPL Dataflow library, kleine Add-on passen könnte sich (es kann über nuget package in Projekt aufgenommen werden, .NET 4.5 wird unterstützt), die Sie gerade leicht einzuführen, um die Strömung so etwas wie diese (Code aktualisiert basierend auf Kommentare mit BroadcastBlock):

var buffer = new BroadcastBlock<string>(); 
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */}); 
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */}); 
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ }); 

über Ihre Lösungslogik nicht sicher tun, also dachte ich, dass man einfach die Saiten arbeitet hier. Sie sollten asynchronously send alle eingehenden Daten für einen ersten Block (wenn Sie Post Ihre Daten, wenn Puffer überlastet, Nachricht verworfen wird) und Link-Blöcke untereinander, wie folgt aus:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true }); 
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true }); 
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true }); 
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true }); 

foreach (var s in IncomingData) 
{ 
    await buffer.SendAsync(s); 
} 
buffer.Complete(); 

Wenn Verbraucher sollten beide Prozess alle Artikel, dann sollten Sie die BroadcastBlock verwenden (es kann einige issues about the guaranteed delivery auftreten), andere Möglichkeit ist, Ihre Nachrichten von den Verbrauchern zu filtern (vielleicht durch einen Rest von Nachrichten-ID nach Anzahl der Verbraucher), aber in diesem Fall sollten Sie verknüpfen zu einem anderen Verbraucher, der alle Nachrichten "fangen" wird, die aus irgendeinem Grund nicht verbraucht wurden.

Wie Sie sehen können, Verbindungen zwischen den Blöcken sind mit vollständiger Ausbreitung erstellt, so dass nach diesen können Sie einfach auf die .Completion Aufgabe Eigenschaft für ein resultsProcessor anbringt:

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ }); 
+0

Beachten Sie, dass BufferBlock den Artikel nur dem ersten Verbraucher anbietet, der vom OP nicht vorgesehen ist. Um dies zu vermeiden, sollten Sie den BufferBlock mit einem TransmitBlock verbinden und den TransmitBlock mit jedem der Verbraucher verbinden. –

+0

Beachten Sie auch, dass SendAsync erwartet werden sollte. –

+0

@EyalPerry danke für das Teilen! – VMAtm

Verwandte Themen