8

Ich möchte eine priorisierte ActionBlock<T> implementieren. So dass ich einige TInput Elemente mit einer Predicate<T> bedingt Priorität geben kann.
Ich lese Parallel Extensions Extras Samples und Guide to Implementing Custom TPL Dataflow Blocks.
Aber immer noch nicht herausfinden, wie kann ich dieses Szenario implementieren.
---------------------------- BEARBEITEN ------------------- --------
Es gibt einige Aufgaben, von denen 5 gleichzeitig ausgeführt werden können. Wenn Benutzer die Schaltfläche drücken, sollten einige (abhängig von der Prädikatfunktion) mit der höchsten Priorität ausgeführt werden.
Tatsächlich schreibe ich diesen CodeAnpassen ActionBlock <T>

TaskScheduler taskSchedulerHighPriority; 
ActionBlock<CustomObject> actionBlockLow; 
ActionBlock<CustomObject> actionBlockHigh; 
... 
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5); 
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0); 
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1); 
... 
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow }); 
...  
if (predicate(customObject)) 
    actionBlockHigh.Post(customObject); 
else 
    actionBlockLow.Post(customObject); 

Aber es scheint Priorität überhaupt bewirkt nicht statt.
---------------------------- BEARBEITEN ------------------
finde ich die Tatsache, dass, wenn ich diese Codezeile verwenden:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow }); 

Ursache Anwendung korrekt Prioritäten der Tasks beobachten, sondern nur eine Aufgabe zu einer Zeit werden, ausführen kann, inzwischen den ersten Codeblock verwenden, die in gezeigt wird fließend, weil die Anwendung 5 Tasks gleichzeitig, aber in unangemessener Reihenfolge ausführt.

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow }); 

Update:
Tanks svick, i MaxMessagesPerTask für taskSchedulerLow angeben sollte.

+2

Was diktiert die Priorität? Ist es etwas, das nicht mit "T" zusammenhängt? Oder ist die Priorität eine inhärente/abgeleitete Eigenschaft von 'T'? – casperOne

+0

Sie können einen benutzerdefinierten Pufferblock erstellen, der eine ConcurrentPriorityQueue verwendet, oder Sie können einen benutzerdefinierten asynchronen Transfomationsblock erstellen. Beide Optionen sind nicht trivial. Stimmen Sie auch mit @ casperOne überein, was bedeutet Priorität in Ihrem Fall? –

Antwort

7

Ihre Frage enthält nicht viele Details, daher ist das Folgende nur eine Schätzung dessen, was Sie benötigen.

Ich denke, der einfachste Weg, dies zu tun ist, zwei ActionBlock s zu haben, die auf verschiedenen Prioritäten auf QueuedTaskScheduler from ParallelExtensionsExtras laufen. Sie würden mit dem Prio-Objekt mit hoher Priorität und dann mit dem Prädikat verbinden. Um sicherzustellen, dass die Task s mit hoher Priorität nicht warten, setzen Sie MaxMessagesPerTask des Blocks mit niedriger Priorität.

Im Code würde es in etwa so aussehen:

static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate) 
{ 
    var buffer = new BufferBlock<T>(); 

    var scheduler = new QueuedTaskScheduler(1); 

    var highPriorityScheduler = scheduler.ActivateNewQueue(0); 
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1); 

    var highPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = highPriorityScheduler 
     }); 
    var lowPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = lowPriorityScheduler, 
      MaxMessagesPerTask = 1 
     }); 

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate); 
    buffer.LinkTo(lowPriorityBlock); 

    return buffer; 
} 

Dies ist nur eine Skizze von dem, was man tun könnte, zum Beispiel, Completion des zurückgegebenen Blockes verhält sich nicht korrekt.

+0

Bitte lesen Sie das bearbeitete Tag – Rzassar

+1

In Ihrem Code geben Sie nicht 'MaxMessagesPerTask' für Ihren Block mit niedriger Priorität an. Wie gesagt, das ist sehr wichtig. – svick