Ich habe Producer/Consumer-Muster in meiner App implementiert TPL Dataflow. Ich habe das große Datenflussnetz mit ungefähr 40 Blöcken darin. Es gibt zwei Hauptfunktionsteile im Netz: Erzeugerteil und Verbraucherteil. Der Produzent sollte kontinuierlich eine Menge Arbeit für den Verbraucher bereitstellen, während der Verbraucher die ankommende Arbeit manchmal langsam abwickeln würde. Ich möchte den Producer pausieren, wenn der Kunde mit einer bestimmten Menge an Arbeitselementen beschäftigt ist. Ansonsten verbraucht die App viel Speicher/CPU und verhält sich nicht nachhaltig.Wie kann der schnelle Erzeuger angehalten werden, wenn der Verbraucher überfordert ist?
Ich habe Demo-Anwendung, die das Problem veranschaulicht:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
Die App druckt etwas wie folgt aus:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
, dass der Hersteller bedeutet hält Nachrichten an Verbraucher Spinnen und Schieben . Ich möchte, dass es pausiert und wartet, bis der Verbraucher den aktuellen Teil der Arbeit beendet hat, und dann sollte der Produzent weiterhin Nachrichten für den Verbraucher bereitstellen.
Meine Frage ist Zitat ähnlich wie andere question, aber es wurde nicht richtig beantwortet. Ich habe diese Lösung ausprobiert und es funktioniert hier nicht, so dass der Produzent den Verbraucher mit Nachrichten überschwemmen kann. Auch die Einstellung BoundedCapacity
funktioniert nicht.
Die einzige Lösung, die ich bisher vermute, ist, meinen eigenen Block zu erstellen, der die Zielblockwarteschlange überwacht und entsprechend der Warteschlange des Zielblocks agiert. Aber ich hoffe, es ist eine Art Overkill für dieses Problem.
Haben Sie in Erwägung gezogen, stattdessen 'Rx' zu verwenden? Werfen Sie einen Blick auf diese Antwort: http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –
Ich hoffe, es wird keine Notwendigkeit für diese seit viel Zeit auf dem Dataflow und es passt meine Bedürfnisse sind gut. – kseen
In Ihrer Demo könnte der Produzent alle Nachrichten selbst produzieren, ohne Nachrichten von sich selbst über den Broadcast-Block zu empfangen. Ist Ihr realer Code auch so, oder ist dieser Producer → Producer Cycle notwendig? – svick