2017-02-02 2 views
-1

Was ist die beste Queue Datenstruktur in C# zu verwenden, wenn die Warteschlange für Enqueue() für mehrere Threads verfügbar sein muss, aber nur Dequeue() für einen einzigen Hauptthread benötigt? Meine Gewindestruktur sieht wie folgt aus:Warteschlangen und Threading

  • Hauptthread - Consumer
  • Sub Thread1 - Hersteller
  • Sub Thread2 - Hersteller
  • Sub Thread3 - Hersteller

Ich habe eine einzelne Queue<T> queue, die alle Elemente enthält, die von den Unterthreads und den Hauptthreadaufrufenerzeugt werdenbis es leer ist. Ich habe folgende Funktion, die zu diesem Zweck an meinem Haupt-Thread aufgerufen wird.

public void ConsumeItems() 
{ 
    while (queue.Count > 0) 
    { 
     var item = queue.Dequeue(); 
     ... 
    } 
} 

der Haupt-Thread ruft diese Funktion auf einmal durch jede Fadenschlinge und ich möchte sicherstellen, dass ich queue in einem Thread-sicher Herren bin Zugriff aber ich möchte auch Sperren queue wenn mögliche Gründe für die Leistung zu vermeiden.

+2

Angenommen, Sie etwas von [System.Collections.Concurrent] (https://msdn.microsoft.com/en-us/library/system.collections.concurrent (v = vs.110 wollen werden). aspx). – Equalsk

+1

Ich würde explizites Warteschlangenmanagement vermeiden und stattdessen die TPL Dataflow-Bibliothek betrachten. –

Antwort

1

Die eine, die Sie verwenden möchten, ist eine BlockingCollection<T>, die standardmäßig durch eine ConcurrentQueue<T> unterstützt wird. Um Elemente aus der Warteschlange zu bekommen würden Sie .GetConsumingEnumerable() aus dem Inneren eines foreach verwenden

public BlockingCollection<Item> queue = new BlockingCollection<Item>(); 

public void LoadItems() 
{ 
    var(var item in SomeDataSource()) 
    { 
     queue.Add(item); 
    } 
    queue.CompleteAdding(); 
} 

public void ConsumeItems() 
{ 
    foreach(var item in queue.GetConsumingEnumerable()) 
    { 
     ... 
    } 
} 

Wenn die Warteschlange der foreach leer ist, wird der Thread blockieren und freizugeben, sobald ein Artikel verfügbar ist. Sobald .CompleteAdding() aufgerufen wurde, beendet foreach alle Elemente in der Warteschlange, aber sobald sie leer ist, wird der Foreach-Block beendet.

Bevor Sie dies jedoch tun, würde ich empfehlen, Sie in TPL Dataflow suchen, damit Sie nicht mehr die Warteschlangen oder die Threads verwalten müssen. Sie können Logikketten erstellen und jeder Block in der Kette kann eine separate Ebene der Parallelität haben.

public Task ProcessDataAsync(IEnumerable<SomeInput> input) 
{ 
    using(var outfile = new File.OpenWrite("outfile.txt")) 
    { 
     //Create a convert action that uses the number of processors on the machine to create parallel blocks for processing. 
     var convertBlock = new TransformBlock<SomeInput, string>(x => CpuIntensiveConversion(x), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Enviorment.ProcessorCount}); 

     //Create a single threaded action that writes out to the textwriter. 
     var writeBlock = new ActionBlock<string>(x => outfile.WriteLine(x)) 

     //Link the convert block to the write block. 
     convertBlock.LinkTo(writeBlock, new DataflowLinkOptions{PropagateCompletion = true}); 

     //Add items to the convert block's queue. 
     foreach(var item in input) 
     { 
       await convertBlock.SendAsync(); 
     } 

     //Tell the convert block we are done adding. This will tell the write block it is done processing once all items are processed. 
     convertBlock.Complete(); 

     //Wait for the write to finish writing out to the file; 
     await writeBlock.Completion; 
    } 
} 
+0

Hoppla! Ich habe meine Beispiele rückwärts geschrieben. Ein Produzent, viele Konsumenten. Die Lösung ändert sich nicht, nur die Beispiele müssen. –