2016-04-12 8 views
10

begrenzen Ich habe eine Sammlung von 1000 Eingabe-Nachricht zu verarbeiten. Ich wiederhole die Eingabesammlung und starte die neue Aufgabe für jede Nachricht, die verarbeitet werden soll.Wie die maximale Anzahl der parallelen Aufgaben in C#

//Assume this messages collection contains 1000 items 
var messages = new List<string>(); 

foreach (var msg in messages) 
{ 
    Task.Factory.StartNew(() => 
    { 
    Process(msg); 
    }); 
} 

Können wir erraten, wie viele maximalen Nachrichten gleichzeitig zu der Zeit verarbeitet bekommen (normal Quad-Core-Prozessor vorausgesetzt), oder können wir die maximale Anzahl der Nachrichten beschränken zu der Zeit verarbeitet werden?

Wie wird sichergestellt, dass diese Nachricht in der gleichen Reihenfolge/Reihenfolge der Sammlung verarbeitet wird?

+0

Wie wäre es, die Nachrichten in Chargen aufzuteilen und jede Charge parallel laufen zu lassen? – bit

Antwort

7

SemaphoreSlim ist eine sehr gute Lösung in diesem Fall, und ich OP higly empfehlen, dies zu versuchen, , aber die Antwort von @ Manoj hat einen Fehler, wie er in den Kommentaren erwähnt wird. Auf semaphore sollte gewartet werden, bevor die Aufgabe wie folgt erzeugt wird.

Aktualisiert Antwort: Wie @Vasyl wies darauf hin, Semaphore vor Beendigung der Aufgaben entsorgt werden kann und die Ausnahme auslösen, wenn Release() -Methode aufgerufen wird so vor der Verwendung von Block Verlassen für den Abschluss aller erstellten Aufgaben warten müssen.

int maxConcurrency=10; 
var messages = new List<string>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    List<Task> tasks = new List<Task>(); 
    foreach(var msg in messages) 
    { 
     concurrencySemaphore.Wait(); 

     var t = Task.Factory.StartNew(() => 
     { 

      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 

     tasks.Add(t); 
    } 

    Task.WaitAll(tasks.ToArray()); 
} 
+0

Was ist, wenn 'Process' Methode für eine lange Zeit läuft? 'concurrencySemaphore.Release()' kann aufgerufen werden, wenn 'concurrencySemaphore' bereits entsorgt wird. Und als Ergebnis - "ObjectDisposedException". –

+0

@VasylZvarydchuk Sie haben Recht.Ich habe die Antwort aktualisiert – ClearLogic

1

Sie können einfach die maximale Concurrency Grad wie auf diese Weise festgelegt:

int maxConcurrency=10; 
var messages = new List<1000>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    foreach(var msg in messages) 
    { 
     Task.Factory.StartNew(() => 
     { 
      concurrencySemaphore.Wait(); 
      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 
    } 
} 
+1

Dies blockiert unnötig Threads, wenn der Thread-Pool mehr Threads als Ihre maximale Parallelität hat. – yaakov

11

Sie Parallel.Foreach nutzen könnten und sich auf MaxDegreeOfParallelism statt.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, 
msg => 
{ 
    // logic 
    Process(msg); 
}); 
+1

Dies ist genau die Art von Verarbeitung, für die 'Parallel.ForEach' gemacht wurde. –

+0

Und da die Task Parallel Library auf dem 'ThreadPool' aufbaut, können wir annehmen, dass sie nur so viele Aufgaben ausführen wird, wie das System Kerne hat, wenn wir sie nicht explizit spezifizieren. – Toxantron

+0

Würde dies sicherstellen, dass die Nachrichten in der gleichen Reihenfolge wie in der Liste verarbeitet werden? – bit

1

denken wäre besser, Benutzer Parallel LINQ

Parallel.ForEach(messages , 
    new ParallelOptions{MaxDegreeOfParallelism = 4}, 
      x => Process(x); 
     ); 

wobei x max degree of Parallelism ist

0

Wenn Sie in Ordnung brauchen (Verarbeitung könnte in beliebiger Reihenfolge beenden) Schlange stehen, gibt es keine Notwendigkeit für eine Semaphore. Old fashioned if Anweisungen funktionieren gut:

 const int maxConcurrency = 5; 
     List<Task> tasks = new List<Task>(); 
     foreach (var arg in args) 
     { 
      var t = Task.Run(() => { Process(arg); }); 

      tasks.Add(t); 

      if(tasks.Count >= maxConcurrency) 
       Task.WaitAny(tasks.ToArray()); 
     } 

     Task.WaitAll(tasks.ToArray()); 
Verwandte Themen