7

Mit der TPL Datenfluß-Bibliothek, würde Ich mag, so etwas tun:mit TPL Dataflow, kann ich alle Beiträge abbrechen und dann einen hinzufügen?

myActionBlock.Post(newValue, cancelAllPreviousPosts: true); 

Es scheint, dass die Annullierung Token auf ActionBlock die ganze Sache aufhebt; Ich müsste einen neuen ActionBlock machen, wenn ich diesen setze. Ist es möglich mit ActionBlock eine teilweise Stornierung durchzuführen?

Beiträge, die noch nicht verarbeitet wurden, sollten nicht versucht werden. Es wäre schön, wenn ein Kündigungs-Token verfügbar wäre, um den aktuell ausgeführten Post einzusehen.

+0

Ich postete diese eine Weile zurück, aber ich habe seitdem meine eigene Bibliothek gemacht; Ich habe eine "neueste" Aktionswarteschlange in meiner Kts.ActorsLite-Bibliothek: https://github.com/BrannonKing/Kts.ActorsLite – Brannon

Antwort

4

Werfen Sie einen Blick auf BroadcastBlock<T>, die nur den neuesten Artikel enthält. Sie können einen Broadcast-Block vor eine ActionBlock<T> stellen.

Während ein neues Element im Broadcast-Block gepostet wird, wird das Element, das gerade vom Aktionsblock verarbeitet wird, nicht gelöscht. Es überschreibt alle vorhandenen Elemente, die bereits vom Broadcast-Block gehalten werden. Verwerfen von älteren Nachrichten, die noch nicht vom Aktionsblock verarbeitet wurden. Wenn der Aktionsblock seinen aktuellen Eintrag vervollständigt, nimmt er den letzten Eintrag, der an den Rundsendeblock gesendet wurde.

+0

Dies ist die Antwort, nach der ich gesucht habe. Vielen Dank. – Brannon

0

Es gibt nichts, wie dies direkt in TPL Dataflow, aber ich kann mehrere Möglichkeiten, wie Sie es selbst implementieren könnte:

  1. Wenn Sie müssen, um den modifizierten Block als zu behandeln, nicht in der Lage normaler Datenflussblock (z. B. keine Unterstützung für LinkTo()), dann würde ein einfacher Weg einen Typ schreiben, der ActionBlock umschließt, aber dessen Elemente auch ein Flag enthalten, das angibt, ob sie verarbeitet werden sollen. Wenn Sie angeben, werden alle diese Flags zurückgesetzt, sodass diese Elemente übersprungen werden.

    Der Code wie folgt aussehen könnte:

    class CancellableActionBlock<T> 
    { 
        private class Item 
        { 
         public T Data { get; private set; } 
         public bool ShouldProcess { get; set; } 
    
         public Item(T data) 
         { 
          Data = data; 
          ShouldProcess = true; 
         } 
        } 
    
        private readonly ActionBlock<Item> actionBlock; 
        private readonly ConcurrentDictionary<Item, bool> itemSet; 
    
        public CancellableActionBlock(Action<T> action) 
        { 
         itemSet = new ConcurrentDictionary<Item, bool>(); 
         actionBlock = new ActionBlock<Item>(item => 
         { 
          bool ignored; 
          itemSet.TryRemove(item, out ignored); 
    
          if (item.ShouldProcess) 
          { 
           action(item.Data); 
          } 
         }); 
        } 
    
        public bool Post(T data, bool cancelAllPreviousPosts = false) 
        { 
         if (cancelAllPreviousPosts) 
         { 
          foreach (var item in itemSet.Keys) 
          { 
           item.ShouldProcess = false; 
          } 
          itemSet.Clear(); 
         } 
    
         var newItem = new Item(data); 
         itemSet.TryAdd(newItem, true); 
         return actionBlock.Post(newItem); 
        } 
    
        // probably other members that wrap actionBlock members, 
        // like Complete() and Completion 
    } 
    
  2. Wenn Sie etwas erstellen möchten, die mehr zusammensetzbare und wiederverwendbar ist, können Sie einen speziellen Block erstellen, für die Stornierung. Du könntest das implementieren, indem du BufferBlock s miteinander verknüpfst, wobei der dritte eine Kapazität von 1 und der zweite eine unbegrenzte Kapazität haben würde. Auf diese Weise befinden sich fast alle in der Warteschlange befindlichen Elemente im zweiten Block, sodass Sie die Löschung durchführen können, indem Sie diesen Block gegen einen neuen austauschen. Die gesamte Struktur würde durch den ersten und den dritten Block dargestellt werden.

    Die Probleme mit diesem Ansatz besteht darin, dass die Stornierung eine Verzögerung von 1 Element hat (die, die im dritten Block ist). Außerdem habe ich keine gute Schnittstelle dafür gefunden.

+0

Ich vergebe hier die +50 für die Mühe am Beispiel. Vielen Dank. – Brannon

0

Neben Monroe Thomas Antwort ist es wichtig zu verstehen, dass die ActionBlock die BroadcastBlock folgende braucht es BoundedCapacity begrenzt auf 1 oder es wird speichern und verarbeiten jede Nachricht des Sendeblocks, auch wenn es immer noch Ausführen.
Ein Codebeispiel geht hier:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber => 
{ 
    await Task.Delay(100); 
    Console.WriteLine($">{ThisNumber}"); 
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null); 
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

for(int IX = 0; IX < 128; IX++) 
{ 
    await ThrottleBlock.SendAsync(IX); 
    await Task.Delay(10); 
} 

Daraus ergibt sich wie folgt zusammen:

>0 
>6 
>12 
>20 
>27 
>34 
>41 
>48 
>55 
>62 
>68 
>75 
>82 
>88 
>95 
>101 
>108 
>115 
>122 
>127 

Viel Spaß!
-Simon

Verwandte Themen