Ich arbeite jetzt mit TPL Dataflow und muss meinen eigenen Aktionsblock implementieren.TPL Dataflow: Nachrichten von zwei eingehenden Blöcken nacheinander verarbeiten
Dieser Aktionsblock sollte Nachrichten von zwei verschiedenen Eingangsblöcken akzeptieren, diese Nachrichten in eine einzelne Warteschlange stellen und diese Warteschlange dann sequenziell verarbeiten. Der Hauptpunkt hier ist, dass zwei verschiedene Aufgaben nicht gleichzeitig ausgeführt werden sollten und ich möchte keine Sperren verwenden.
Hier ist meine Lösung, aber es funktioniert nicht richtig.
public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
where TInputLhs : class
where TInputRhs : class
{
public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }
private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();
private ITargetBlock<object> queue;
public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
{
queue = new ActionBlock<object>(x =>
{
if (x is TInputLhs)
{
actionLhs(x as TInputLhs);
}
else
{
actionRhs(x as TInputRhs);
}
});
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
}
public void Complete()
{
queue.Complete();
}
public Task Completion
{
get { return queue.Completion; }
}
public void Fault(Exception exception)
{
queue.Fault(exception);
}
}
einfaches Anwendungsbeispiel:
static void Main(string[] args)
{
var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));
var batchBlock = new BatchBlock<string>(3);
var processInOrderBlock = new OrderedActionBlock<string, string[]>(
new Action<string>((str) =>
{
Console.WriteLine(str);
}),
new Action<string[]>((batch) =>
{
Console.WriteLine("BATCH - " + string.Join(", ", batch));
}));
splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });
for (int i = 1; i <= 10; i++)
{
splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
}
splitBlock.Complete();
processInOrderBlock.Completion.Wait();
return;
}
Der Ausgang:
xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .
Sieht aus wie in batchBlock
stecken Nachrichten. Und ich weiß nicht warum.
Warum können Sie nicht nur mit einer Parallelität Grenze von einem ein reguläres ActionBlock verwenden? Sie haben das fast, das Limit fehlt. – usr
@ usr, in diesem Fall sollte ich Code, der den Typ der Nachricht (ist es eine einzelne Nachricht oder eine Batch) in den Benutzercode, während ich solche Infrastruktur-Code irgendwo in der Bibliothek zu halten. Außerdem mag ich ActionBlock nicht, die "irgendein Objekt" (d. H. ActionBlock
Können Sie deutlicher sagen, warum Sie maxparallelism = 1 nicht einfach setzen können? Was hat das mit der Dosierung zu tun? – usr