2016-08-13 2 views
5

Ich habe Producer/Consumer-Muster in meiner App implementiert TPL Dataflow. Ich habe das große Datenflussnetz mit ungefähr 40 Blöcken darin. Es gibt zwei Hauptfunktionsteile im Netz: Erzeugerteil und Verbraucherteil. Der Produzent sollte kontinuierlich eine Menge Arbeit für den Verbraucher bereitstellen, während der Verbraucher die ankommende Arbeit manchmal langsam abwickeln würde. Ich möchte den Producer pausieren, wenn der Kunde mit einer bestimmten Menge an Arbeitselementen beschäftigt ist. Ansonsten verbraucht die App viel Speicher/CPU und verhält sich nicht nachhaltig.Wie kann der schnelle Erzeuger angehalten werden, wenn der Verbraucher überfordert ist?

Ich habe Demo-Anwendung, die das Problem veranschaulicht:

mesh

using System; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace DataflowTest 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var options = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false 
      }; 

      var boundedOptions = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false, 
       BoundedCapacity = 5 
      }; 

      var bufferBlock = new BufferBlock<int>(boundedOptions); 
      var producerBlock = new TransformBlock<int, int>(x => x + 1, options); 
      var broadcastBlock = new BroadcastBlock<int>(x => x, options); 

      var consumerBlock = new ActionBlock<int>(async x => 
      { 
       var delay = 1000; 
       if (x > 10) delay = 5000; 

       await Task.Delay(delay); 

       Console.WriteLine(x); 
      }, boundedOptions); 

      producerBlock.LinkTo(bufferBlock); 
      bufferBlock.LinkTo(broadcastBlock); 
      broadcastBlock.LinkTo(producerBlock); 
      broadcastBlock.LinkTo(consumerBlock); 

      bufferBlock.Post(1); 

      consumerBlock.Completion.Wait();    
     }   
    } 
} 

Die App druckt etwas wie folgt aus:

2 
1 
3 
4 
5 
69055 
69053 
69054 
69057 
438028 
438040 
142303 
438079 

, dass der Hersteller bedeutet hält Nachrichten an Verbraucher Spinnen und Schieben . Ich möchte, dass es pausiert und wartet, bis der Verbraucher den aktuellen Teil der Arbeit beendet hat, und dann sollte der Produzent weiterhin Nachrichten für den Verbraucher bereitstellen.

Meine Frage ist Zitat ähnlich wie andere question, aber es wurde nicht richtig beantwortet. Ich habe diese Lösung ausprobiert und es funktioniert hier nicht, so dass der Produzent den Verbraucher mit Nachrichten überschwemmen kann. Auch die Einstellung BoundedCapacity funktioniert nicht.

Die einzige Lösung, die ich bisher vermute, ist, meinen eigenen Block zu erstellen, der die Zielblockwarteschlange überwacht und entsprechend der Warteschlange des Zielblocks agiert. Aber ich hoffe, es ist eine Art Overkill für dieses Problem.

+0

Haben Sie in Erwägung gezogen, stattdessen 'Rx' zu verwenden? Werfen Sie einen Blick auf diese Antwort: http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –

+0

Ich hoffe, es wird keine Notwendigkeit für diese seit viel Zeit auf dem Dataflow und es passt meine Bedürfnisse sind gut. – kseen

+0

In Ihrer Demo könnte der Produzent alle Nachrichten selbst produzieren, ohne Nachrichten von sich selbst über den Broadcast-Block zu empfangen. Ist Ihr realer Code auch so, oder ist dieser Producer → Producer Cycle notwendig? – svick

Antwort

4

Wenn Sie den Producer → Buffer → Broadcast-Zyklus intakt halten müssen, müssen Sie den Broadcast-Block durch einen anderen Block ersetzen, der immer noch Nachrichten empfängt, aber wartet, wenn eines seiner Ziele voll ist.

Solange Sie die Ziele dieser Blöcke wissen, wenn Sie es erstellen, können Sie bauen es ActionBlock (Code aus another answer of mine kopiert) mit:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets) 
{ 
    var block = new ActionBlock<T>(
     async item => 
     { 
      foreach (var target in targets) 
      { 
       await target.SendAsync(item); 
      } 
     }, new ExecutionDataflowBlockOptions 
     { 
      BoundedCapacity = options.BoundedCapacity, 
      CancellationToken = options.CancellationToken 
     }); 

    block.Completion.ContinueWith(task => 
    { 
     foreach (var target in targets) 
     { 
      if (task.Exception != null) 
       target.Fault(task.Exception); 
      else 
       target.Complete(); 
     } 
    }); 

    return block; 
} 

diese verwenden, können Sie den Broadcast-Block deklarieren :

var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock); 

(Sie müssen auch die LinkTo Linien entfernen, die von broadcastBlock verknüpfen.)

Ein Problem mit Ihrem ursprünglichen Code, das nicht behoben werden kann, ist der Abschluss, aber das ist ein schwieriges Problem in TPL Dataflow mit Zyklen im Allgemeinen.

+0

Bezüglich der Fertigstellung, was ist, wenn mein Netzwerk kontinuierlich sein wird? Da es in Zukunft keine Fertigstellung gibt, sollte es weiter funktionieren, solange die App funktioniert. – kseen

+0

Ich habe gerade dieses 'GuaranteedBroadcastBlock' in meiner Demo-Anwendung ausprobiert und es funktioniert wie ein Zauber! Perfekt! Ich danke dir sehr. – kseen

+0

Das ist der beste Fall: Sie brauchen keine Fertigstellung, also ist es in Ordnung, es funktioniert nicht. – svick

0

Es sieht so aus, als ob Ihr Producer eine Sequenz generiert, so dass der gesamte Producer → buffer → broadcast cycle nicht benötigt wird. Stattdessen werden alle drei Blöcke durch eine async Schleife ersetzt werden könnte, die das nächste Element erzeugt und sendet sie dann an den Verbraucher await SendAsync() mit:

Task.Run(async() => 
{ 
    int i = 1; 
    while (true) 
    { 
     await consumerBlock.SendAsync(i); 
     i++; 
    } 
    consumerBlock.Complete(); 
}); 

Auf diese Weise, wenn der Verbraucher seine Kapazität erreicht, wird await SendAsync(), dass der Hersteller sicher, wartet, bis der Konsument einen Artikel konsumiert.

Wenn Sie einen solchen Producer in einen Datenflussblock einkapseln möchten, können Sie z. Verknüpfen Sie es mit dem Verbraucher, you can.

+0

Mein wirklicher "Produzent" ist eine Gruppe von Blöcken, die eine Kommentarseite laden (die den Link zur nächsten Kommentarseite enthält), den Inhalt der aktuellen Kommentarseite analysieren, die Kommentare an den Verbraucher senden und diesen Zyklus erneut starten, wobei die Adresse von übergeben wird nächste Kommentarseite zum ersten Block in diesem Producer-Zyklus. Also, das ist leider nicht nur eine Sequenz. Es ist wie eine verknüpfte Sequenz, bei der jedes Element in der Sequenz die Adresse zum nächsten Element in ihm hat und das letzte Element in der Sequenz nicht die Adresse des nächsten Elements hat. Entschuldigung für diese Frage ist so einfach. – kseen

+0

Ich habe gerade das Diagramm erstellt, das die reale Situation besser darstellt. Hier gehts: http://imgur.com/iEklfeG – kseen

Verwandte Themen