14

Während ich Parallel.ForEach in meinem Programm verwendete, stellte ich fest, dass einige Threads nie zu Ende schienen. Tatsächlich brachte es immer neue Fäden hervor, ein Verhalten, das ich nicht erwartet hatte und definitiv nicht möchte. die, genau wie mein ‚echtes‘ ProgrammParallel.ForEach erzeugt immer neue Threads

konnte ich dieses Verhalten mit dem folgenden Code reproduzieren, sowohl Prozessor und Speicher verwendet eine Menge (.NET 4.0-Code):

public class Node 
{ 
    public Node Previous { get; private set; } 

    public Node(Node previous) 
    { 
     Previous = previous; 
    } 
} 

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     DateTime startMoment = DateTime.Now; 
     int concurrentThreads = 0; 

     var jobs = Enumerable.Range(0, 2000); 
     Parallel.ForEach(jobs, delegate(int jobNr) 
     { 
      Interlocked.Increment(ref concurrentThreads); 

      int heavyness = jobNr % 9; 

      //Give the processor and the garbage collector something to do... 
      List<Node> nodes = new List<Node>(); 
      Node current = null; 
      for (int y = 0; y < 1024 * 1024 * heavyness; y++) 
      { 
       current = new Node(current); 
       nodes.Add(current); 
      } 

      TimeSpan elapsed = DateTime.Now - startMoment; 
      int threadsRemaining = Interlocked.Decrement(ref concurrentThreads); 
      Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining); 
     }); 
    } 
} 

Wenn lief auf meinem Quad-Core, es beginnt zunächst mit 4 gleichzeitigen Threads, wie Sie es erwarten würden. Mit der Zeit werden jedoch immer mehr Threads erstellt. Schließlich wirft das Programm dann eine OutOfMemoryException:

[00:00] Job 0 complete. 3 threads remaining. 
[00:01] Job 1 complete. 4 threads remaining. 
[00:01] Job 2 complete. 4 threads remaining. 
[00:02] Job 3 complete. 4 threads remaining. 
[00:05] Job 9 complete. 5 threads remaining. 
[00:05] Job 4 complete. 5 threads remaining. 
[00:05] Job 5 complete. 5 threads remaining. 
[00:05] Job 10 complete. 5 threads remaining. 
[00:08] Job 11 complete. 5 threads remaining. 
[00:08] Job 6 complete. 5 threads remaining. 
... 
[00:55] Job 67 complete. 7 threads remaining. 
[00:56] Job 81 complete. 8 threads remaining. 
... 
[01:54] Job 107 complete. 11 threads remaining. 
[02:00] Job 121 complete. 12 threads remaining. 
.. 
[02:55] Job 115 complete. 19 threads remaining. 
[03:02] Job 166 complete. 21 threads remaining. 
... 
[03:41] Job 113 complete. 28 threads remaining. 
<OutOfMemoryException> 

Das Diagramm zur Speicherauslastung für das Experiment oben ist wie folgt:

Processor and memory usage

(Der Screenshot in Niederländisch ist, der obere Teil repräsentiert Prozessor Verwendung, die untere Teilspeicherverwendung) Wie Sie sehen können, sieht es so aus, als ob ein neuer Thread fast jedes Mal erzeugt wird, wenn der Garbage Collector in den Weg kommt (wie man an den Einbrüchen der Speichernutzung sehen kann).

Kann mir jemand erklären, warum das passiert und was ich dagegen tun kann? Ich will nur .NET Laichen neue Themen zu stoppen, und beenden Sie die vorhandenen Threads zuerst ...

+0

ich eine Follow-up-Frage shooted habe [ ] (http://stackoverflow.com/questions/15381174/how-to-count-the-amount-of-concurrent-threads-in-net-application) Wenn Sie die Threads direkt c cotieren, erhöhen sie sich meist nur (sehr selten und unwesentlich abnehmend) – Fulproof

Antwort

16

Sie können die maximale Anzahl von Threads begrenzen, die durch die Angabe einer ParallelOptions Instanz mit dem MaxDegreeOfParallelism Objekt erstellt erhalten:

var jobs = Enumerable.Range(0, 2000); 
ParallelOptions po = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount 
}; 

Parallel.ForEach(jobs, po, jobNr => 
{ 
    // ... 
}); 

was warum Sie bekommen das Verhalten Sie beobachten: die TPL (die PLINQ zugrunde liegt) ist in der Standardeinstellung auf freiem Fuß, die optimale Anzahl von Threads zu erraten, zu verwenden. Wann immer eine parallele Aufgabe blockiert, kann der Aufgabenplaner einen neuen Thread erstellen, um den Fortschritt aufrechtzuerhalten. In Ihrem Fall könnte die Blockierung implizit erfolgen. zum Beispiel durch den Anruf oder (wie Sie beobachtet haben) während der Speicherbereinigung.

Von Concurrency Levels Tuning with Task Parallel Library (How Many Threads to Use?):

Da die TPL Standardrichtlinie einen Thread pro Prozessor zu verwenden ist, können wir schließen, dass TPL geht zunächst davon aus, dass die Arbeitsbelastung einer Aufgabe ist ~ 100% arbeiten und 0% Warten, und wenn die anfängliche Annahme fehlschlägt und die Aufgabe in einen Wartezustand eintritt (dh beginnt, zu blockieren) - TPL mit der Freiheit, Fäden entsprechend hinzuzufügen.

+1

Erläuterung: 'Parallel.ForEach()' ist * kein * Teil von PLINQ. – svick

+0

@svick: Du hast recht; es ist TPL. – Douglas

+0

Danke für Ihre Antwort. Es macht Sinn, dass wenn ein Thread blockiert, der TPL einen neuen Thread erstellt, um den Fortschritt am Laufen zu halten, ich einfach nicht dachte, dass die Bereinigung des Garbage Collectors als ein Thread gewertet wird, der blockiert wird. Auch danke für den Link, den Sie gepostet haben, es erklärt, warum Sie eigentlich mehr Threads als Kerne wollen, habe ich nie gedacht. – Astrotrain

6

Sie sollten wahrscheinlich ein wenig über die Funktionsweise des Task Schedulers lesen.

http://msdn.microsoft.com/en-us/library/ff963549.aspx (letztere Hälfte der Seite)

„.NET-Thread-Pool verwaltet automatisch die Anzahl der Arbeiter Threads im Pool. Es fügt und entfernt Gewinde nach Einbau- Heuristiken. Das .NET-Thread-Pool hat zwei Hauptmechanismen für die Injektion von Threads: ein Verhinderungsmechanismus, der Worker-Threads fügt, wenn keine Fortschritte gemacht wird in der Warteschlange Artikel und eine Hill-Climbing Heuristik, die versucht, den Durchsatz zu maximieren, während so wenige Threads verwenden wie möglich.

Das Ziel der Verhinderungsvermeidung ist die Vermeidung von Deadlock. Diese Art von Deadlock kann auftreten, wenn ein Worker-Thread auf ein -Ereignis wartet, das nur von einem Arbeitselement erfüllt werden kann, das noch aussteht in den globalen oder lokalen Warteschlangen des Thread-Pools. Wenn es eine feste Anzahl von Worker-Threads gäbe und alle diese Threads ähnlich blockiert wären, könnte das System niemals weitere Fortschritte machen. Das Hinzufügen eines neuen Workerts löst das Problem.

Ein Ziel der Hill-Climbing-Heuristik ist die Nutzung von Kernen zu verbessern, wenn Themen von I/O oder anderen Wartebedingungen blockiert werden, die den Prozessor Stall . Der verwaltete Threadpool enthält standardmäßig einen Worker-Thread pro Kern. Wenn einer dieser Worker-Threads blockiert wird, besteht die Möglichkeit, dass ein Core möglicherweise nicht ausreichend ausgelastet ist ( ), abhängig von der Gesamtauslastung des Computers. Die Thread-Injektion Logic unterscheidet nicht zwischen einem blockierten Thread und einem Thread , der eine langwierige, rechenintensive Operation ausführt. Daher , wenn der Thread-Pool globale oder lokale Warteschlangen anstehende Arbeit Artikel, aktive Arbeitselemente enthalten, die eine lange Zeit in Anspruch nehmen (mehr als ein halbe Sekunde) ausführen kann die Schaffung neuer Thread-Pool Arbeiter Threads auslösen.“

Sie eine Aufgabe als LongRunning markieren aber dies hat den Nebeneffekt, einen Thread für sie von außerhalb des Thread-Pool Zuweisung was bedeutet, dass die Aufgabe nicht inlined werden kann.

Denken Sie daran, dass die ParallelFor behandelt die Arbeit, die es gegeben ist als Blöcke, selbst wenn die Arbeit in einer Schleife ziemlich klein ist, kann die Gesamtarbeit, die von der Aufgabe ausgeführt wird, die durch das Aussehen aufgerufen wird, für den Scheduler länger erscheinen.

Die meisten Aufrufe an den GC in und von ihnen selbst blockieren nicht (es läuft auf einem separaten Thread), aber wenn Sie darauf warten, dass GC beendet wird, blockiert dies. Denken Sie auch daran, dass der GC Speicher neu anordnet, so dass dies einige Nebenwirkungen (und Blockierung) haben kann, wenn Sie versuchen, Speicher während der Ausführung von GC zuzuweisen. Ich habe hier keine Besonderheiten, aber ich weiß, dass der PPL einige Speicherzuweisungsfunktionen speziell für die gleichzeitige Speicherverwaltung aus diesem Grund hat.

Mit Blick auf die Ausgabe Ihres Codes scheint es, dass die Dinge für viele Sekunden laufen. Ich bin also nicht überrascht, dass Sie Thread-Injektion sehen. Ich erinnere mich jedoch, dass die Standard-Thread-Pool-Größe ungefähr 30 Threads beträgt (wahrscheinlich abhängig von der Anzahl der Kerne in Ihrem System). Ein Thread benötigt ungefähr eine MB Speicher, bevor Ihr Code mehr zuweist, daher ist mir nicht klar, warum Sie hier eine Ausnahme wegen zu wenig Speicher erhalten könnten.

+1

Danke für die Hintergrundinformationen. Wie Sie sehen können, weist jede Aufgabe zwischen 0 und 8 Millionen Knoten zu und benötigt daher viel Speicher. Wenn mehr Aufgaben gestartet werden, wird der Speicher knapp und der GC muss mehr Speicher freigeben, um den neuen Aufgaben gerecht zu werden. Ich vermute, dass der Scheduler nicht erkennen kann, ob ein Thread blockiert ist, weil er auf Speicher oder auf etwas anderes wartet, und startet in beiden Fällen einen neuen Thread. Offensichtlich, wenn der blockierte Thread auf den Speicher wartet, verschlechtert dies nur den Effekt ... – Astrotrain

+0

Ich sehe den gleichen Effekt, und ich bin ein wenig erstaunt, dass dies von der TPL nicht besser gehandhabt wird. Ich lade Datenstücke aus dem Internet von 3M + an verschiedenen Orten ein und verarbeite jeden Chunk, nachdem er in den Speicher geladen wurde. Während Tasks darauf warten, dass Daten aus dem Web geladen werden, sieht die TPL das Blockieren der Weblast und plant neue Tasks, um noch mehr Daten zu laden. Nach einer Weile gibt es so viele Aufgaben, die Datenchunks laden, dass ich am Ende OOM-Exceptions bekomme. Durch die Wahl von 'Parallel.ForEach' löste ich das Problem mit einem festen Wert' MaxDegreeOfParallelism'. –

1

ich die Follow-up-Frage gestellt habe "How to count the amount of concurrent threads in .NET application?"

Wenn die Fäden direkt zu zählen, deren Zahl in Parallel.For() meist ((sehr selten und nur unwesentlich abnimmt) nur erhöht und nicht nach Schleife releleased Fertigstellung.

dies sowohl in der Release- und Debug-Modus Karo, mit

ParallelOptions po = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount 
}; 

und ohne

Die Ziffern variieren, aber Schlussfolgerungen sind die gleichen. Hier

ist bereit Code war ich mit, wenn jemand will spielen: „Wie die Menge der gleichzeitigen Threads in .NET-Anwendung zählen“

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Edit4Posting 
{ 
public class Node 
{ 

    public Node Previous { get; private set; } 
    public Node(Node previous) 
    { 
    Previous = previous; 
    } 
    } 
    public class Edit4Posting 
    { 

    public static void Main(string[] args) 
    { 
     int concurrentThreads = 0; 
     int directThreadsCount = 0; 
     int diagThreadCount = 0; 

     var jobs = Enumerable.Range(0, 160); 
     ParallelOptions po = new ParallelOptions 
     { 
     MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     Parallel.ForEach(jobs, po, delegate(int jobNr) 
     //Parallel.ForEach(jobs, delegate(int jobNr) 
     { 
     int threadsRemaining = Interlocked.Increment(ref concurrentThreads); 

     int heavyness = jobNr % 9; 

     //Give the processor and the garbage collector something to do... 
     List<Node> nodes = new List<Node>(); 
     Node current = null; 
     //for (int y = 0; y < 1024 * 1024 * heavyness; y++) 
     for (int y = 0; y < 1024 * 24 * heavyness; y++) 
     { 
      current = new Node(current); 
      nodes.Add(current); 
     } 
     //******************************* 
     directThreadsCount = Process.GetCurrentProcess().Threads.Count; 
     //******************************* 
     threadsRemaining = Interlocked.Decrement(ref concurrentThreads); 
     Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}", 
      jobNr, threadsRemaining, directThreadsCount); 
     }); 
     Console.WriteLine("FINISHED"); 
     Console.ReadLine(); 
    } 
    } 
} 
Verwandte Themen