2016-09-20 2 views
0

Ich arbeite jetzt mit TPL Dataflow und muss meinen eigenen Aktionsblock implementieren.TPL Dataflow: Nachrichten von zwei eingehenden Blöcken nacheinander verarbeiten

Dieser Aktionsblock sollte Nachrichten von zwei verschiedenen Eingangsblöcken akzeptieren, diese Nachrichten in eine einzelne Warteschlange stellen und diese Warteschlange dann sequenziell verarbeiten. Der Hauptpunkt hier ist, dass zwei verschiedene Aufgaben nicht gleichzeitig ausgeführt werden sollten und ich möchte keine Sperren verwenden.

Hier ist meine Lösung, aber es funktioniert nicht richtig.

public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock 
    where TInputLhs : class 
    where TInputRhs : class 
{ 
    public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } } 
    public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } } 


    private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>(); 
    private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>(); 

    private ITargetBlock<object> queue; 

    public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs) 
    { 
     queue = new ActionBlock<object>(x => 
     { 
      if (x is TInputLhs) 
      { 
       actionLhs(x as TInputLhs); 
      } 
      else 
      { 
       actionRhs(x as TInputRhs); 
      } 
     }); 

     inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
     inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
    } 

    public void Complete() 
    { 
     queue.Complete(); 
    } 

    public Task Completion 
    { 
     get { return queue.Completion; } 
    } 

    public void Fault(Exception exception) 
    { 
     queue.Fault(exception); 
    } 
} 

einfaches Anwendungsbeispiel:

static void Main(string[] args) 
{ 
    var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0)); 

    var batchBlock = new BatchBlock<string>(3); 

    var processInOrderBlock = new OrderedActionBlock<string, string[]>(
     new Action<string>((str) => 
     { 
      Console.WriteLine(str); 
     }), 
     new Action<string[]>((batch) => 
     { 
      Console.WriteLine("BATCH - " + string.Join(", ", batch)); 
     })); 

    splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true }); 
    splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true }); 
    batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true }); 

    for (int i = 1; i <= 10; i++) 
    { 
     splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray())); 
    } 

    splitBlock.Complete(); 

    processInOrderBlock.Completion.Wait(); 

    return; 
} 

Der Ausgang:

xx 
xxxx 
xxxxxx 
xxxxxxxx 
xxxxxxxxxx 
BATCH - x, xxx, xxxxx 
Press any key to continue . . . 

Sieht aus wie in batchBlock stecken Nachrichten. Und ich weiß nicht warum.

+0

Warum können Sie nicht nur mit einer Parallelität Grenze von einem ein reguläres ActionBlock verwenden? Sie haben das fast, das Limit fehlt. – usr

+0

@ usr, in diesem Fall sollte ich Code, der den Typ der Nachricht (ist es eine einzelne Nachricht oder eine Batch) in den Benutzercode, während ich solche Infrastruktur-Code irgendwo in der Bibliothek zu halten. Außerdem mag ich ActionBlock nicht, die "irgendein Objekt" (d. H. ActionBlock ) akzeptiert und statische Typisierung bevorzugt. –

+0

Können Sie deutlicher sagen, warum Sie maxparallelism = 1 nicht einfach setzen können? Was hat das mit der Dosierung zu tun? – usr

Antwort

2

Sieht aus wie queue abgeschlossen ist, wenn jede von inputLhs oder inputRhs abgeschlossen ist (wenn die Verwendung PropagateCompletion = true Option während des Linkens).

Also müssen wir das ändern:

inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true }); 

dazu:

Task.WhenAll(InputLhs.Completion, InputRhs.Completion) 
    .ContinueWith(_ => queue.Complete()); 
Verwandte Themen