2013-02-12 8 views
9

Ich bin nicht sicher über die Verwendung der lokalen init-Funktion in Parallel.ForEach, wie in dem MSDN-Artikel beschrieben: http://msdn.microsoft.com/en-us/library/dd997393.aspxWie funktioniert die lokale Initialisierung mit Parallel ForEach?

Parallel.ForEach<int, long>(nums, // source collection 
    () => 0, // method to initialize the local variable 
    (j, loop, subtotal) => // method invoked by the loop on each iteration 
    { 
     subtotal += nums[j]; //modify local variable 
     return subtotal; // value to be passed to next iteration 
    },... 

Wie funktioniert() => 0 alles initialisieren? Wie lautet der Name der Variablen und wie kann ich sie in der Schleifenlogik verwenden?

+0

() => Initialisiert nichts, Rückgabewert dieser Funktion wird verwendet, um die lokale Variable (Zwischensumme, in Ihrem Beispiel) zu initialisieren. –

Antwort

19

Mit Bezug auf die following overload der Parallel.ForEach statische Erweiterungsmethode:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source, 
    Func<TLocal> localInit, 
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody, 
    Action<TLocal> localFinally 
) 

In Ihrem speziellen Beispiel

Die Linie:

() => 0, // method to initialize the local variable 

ist einfach eine Lambda (anonyme Funktion), die die konstante ganze Zahl Null zurückgibt.Dieses lambda wird als localInit Parameter Parallel.ForEach bestanden - da das Lambda eine ganze Zahl zurückgibt, hat es Func<int> Typen und Typen TLocal kann als int vom Compiler (in ähnlicher Weise abgeleitet werden, TSource kann von dem Typ der als Parameter übergebenen Sammlung gefolgert werden source)

Der Rückgabewert (0) wird dann als 3. Parameter (subtotal) an taskBodyFunc übergeben.

(j, loop, subtotal) => 
{ 
    subtotal += nums[j]; //modify local variable (Bad idea, see comment) 
    return subtotal;  // value to be passed to next iteration 
} 

Dieses zweite Lambda (bestanden taskBody) wird N-mal genannt, wobei N die Anzahl der Elemente dieser Aufgabe durch die TPL Partitionierer zugewiesen: Dieser (0) den Anfangskeim für den Körper Schleife verwendet.

Jeder nachfolgende Aufruf der zweiten taskBody lambda wird eine Lauf Teil Gesamt Berechnung für diese Aufgabe den neuen Wert von subTotal, effektiv weitergeben. Nachdem alle der Aufgabe zugewiesenen Elemente hinzugefügt wurden, wird der dritte und letzte, localFinally Funktionsparameter aufgerufen, der wiederum den letzten Wert subtotal liefert, der von taskBody zurückgegeben wird. Da mehrere dieser Aufgaben parallel ausgeführt werden, muss auch ein abschließender Schritt durchgeführt werden, um alle Teilergebnisse in die endgültige Gesamtsumme aufzuneh- men. Da jedoch mehrere gleichzeitige Aufgaben (in verschiedenen Threads) um die Variable grandTotal konkurrieren können, ist es wichtig, dass Änderungen daran threadsicher vorgenommen werden.

(Ich habe geändert Namen der MSDN-Variablen, um es klar)

long grandTotal = 0; 
Parallel.ForEach(nums,   // source collection 
() => 0,      // method to initialize the local variable 
    (j, loop, subtotal) =>   // method invoked by the loop on each iteration 
    subtotal + nums[j],   // value to be passed to next iteration subtotal 
    // The final value of subtotal is passed to the localFinally function parameter 
    (subtotal) => Interlocked.Add(ref grandTotal, subtotal) 

Im MS Beispiel, Änderung des Parameters Wert Ihrer innerhalb der Task Körper ist eine schlechte Praxis und unnötig. dh Der Code subtotal += nums[j]; return subtotal; wäre besser, als nur return subtotal + nums[j]; die mit der Lambda-Stenografie Projektion abgekürzt werden könnte (j, loop, subtotal) => subtotal + nums[j]

Allgemeines

Die localInit/body/localFinally Überlastungen von Parallel.For/Parallel.ForEach erlauben einmal pro Aufgabe Initialisierung und Bereinigungscode zu ausgeführt werden, vor und nach (bzw.) taskBody Iterationen werden von der Task durchgeführt.

(unter Hinweis auf die für Entfernungs-/Enumerable vorbei zum parallelen For/Foreach werden in Chargen von IEnumerable<> aufgeteilt werden, von denen jeder einer Aufgabe zugeordnet wird)

In jede Aufgabe wird localInit einmal aufgerufen werden, wiederholt aufgerufen, sobald der body Code pro Stück in Charge (0..N mal) wird, und localFinally wird, sobald nach Abschluss aufgerufen werden.

Darüber hinaus können Sie ein beliebigen Zustand für die Dauer der Aufgabe erforderlich passieren (das heißt zu den taskBody und localFinally Delegierten) über einen generischen TLocal Rückgabewert der localInit Func - ich habe diese Variable taskLocals unten genannt.

Gemeinsame Nutzung von „localInit“:

  • Erstellen und teure Ressourcen durch den Schleifenkörper benötigt Initialisierung wie eine Datenbankverbindung oder eine Web-Service-Verbindung.
  • Keeping Aufgaben Lokale Variablen zu halten (uncontended) laufende Summen oder Sammlungen
  • Wenn Sie mehrere Objekte aus localInit zum taskBody und localFinally zurückkehren müssen, können Sie die Verwendung einer stark typisierte Klasse machen, ein Tuple<,,> oder, wenn Sie verwenden nur Lambdas für die localInit/taskBody/localFinally, Sie können auch Daten über eine anonyme Klasse übergeben. Hinweis: Wenn Sie die Rückgabe von localInit verwenden, um einen Referenztyp für mehrere Aufgaben zu verwenden, müssen Sie die Threadsicherheit für dieses Objekt berücksichtigen - Unveränderlichkeit ist vorzuziehen.

Gemeinsame Nutzung der "localFinally" Aktion:

  • Ressourcen freizugeben, wie IDisposables in den taskLocals (zB Datenbankverbindungen, Datei-Handles, Web-Service-Clients, etc.)
  • Zusammenfassen/Kombinieren/Reduzieren der von jeder Aufgabe ausgeführten Arbeit zurück in gemeinsame Variablen. Diese geteilten Variablen werden bestritten, so dass die Thread-Sicherheit ein Problem darstellt:
    • z.B. Interlocked.Increment auf primitive Typen wie ganze Zahlen
    • lock oder ähnliches wird für Schreiboperationen
    • Nutzen Sie die concurrent collections sparen Zeit und Aufwand erforderlich.

Die taskBody ist der tight Teil des Loop-Betrieb - Sie werden diese für die Leistung optimieren möchten.

Dies alles wird am besten mit einem kommentierten Beispiel zusammengefasst:

public void MyParallelizedMethod() 
{ 
    // Shared variable. Not thread safe 
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task. 
    () => 
    { 
     // Local `task` variables have no contention 
     // since each Task can never run by multiple threads concurrently 
     var sqlConnection = new SqlConnection("connstring..."); 
     sqlConnection.Open(); 

     // This is the `task local` state we wish to carry for the duration of the task 
     return new 
     { 
      Conn = sqlConnection, 
      RunningTotal = 0 
     } 
    }, 
    // Task Body. Invoked once per item in the batch assigned to this task 
    (item, loopState, taskLocals) => 
    { 
     // ... Do some fancy Sql work here on our task's independent connection 
     using(var command = taskLocals.Conn.CreateCommand()) 
     using(var reader = command.ExecuteReader(...)) 
     { 
     if (reader.Read()) 
     { 
      // No contention for `taskLocal` 
      taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]); 
     } 
     } 
     // The same type of our `taskLocal` param must be returned from the body 
     return taskLocals; 
    }, 
    // LocalFinally called once per Task after body completes 
    // Also takes the taskLocal 
    (taskLocals) => 
    { 
     // Any cleanup work on our Task Locals (as you would do in a `finally` scope) 
     if (taskLocals.Conn != null) 
     taskLocals.Conn.Dispose(); 

     // Do any reduce/aggregate/synchronisation work. 
     // NB : There is contention here! 
     Interlocked.Add(ref itemCount, taskLocals.RunningTotal); 
    } 

Und weitere Beispiele:

Example of per-Task uncontended dictionaries

Example of per-Task database connections

+0

Entfernen Sie das DB-Beispiel und ersetzen Sie es durch ein an der CPU gebundenes Beispiel. Die Verwendung von Parallel.For mit Datenbanken ist wahrscheinlich keine gute Idee - async/warte und Parallelität durch 'Task.WhenAll' ist vorzuziehen. – StuartLC

2

Sie können einen Hinweis auf MSDN in der correct Parallel.ForEach Überlastung erhalten.

Der localInit delegieren wird für jeden Thread einmal aufgerufen, die bei der Ausführung der Schleife beteiligt und gibt den Ausgang lokalen Zustand für jede dieser Aufgaben. Diese Anfangszustände werden an die ersten Körperaufrufe bei jeder Aufgabe übergeben. Dann gibt jeder nachfolgende Körperaufruf einen möglicherweise geänderten Statuswert zurück, der an den nächsten Körperaufruf übergeben wird.

In Ihrem Beispiel () => 0 ist ein Delegierter 0 gerade Rückkehr, so wird dieser Wert für die erste Iteration auf jede Aufgabe verwendet.

+0

Ich hatte diesen Text vor dem Posten hier gelesen, aber meine Frage bleibt die gleiche. Wie liefert mir die Verwendung eines Lambda, das nur 0 zurückgibt, einen Wert, der "für die erste Iteration bei jeder Aufgabe verwendet wird"? Wie kann ich diesen Wert im Code verwenden? Es hat keine Kennung. –

+3

Es hat - es ist die "Zwischensumme" in Ihrem Beispiel. Aber nur in der ersten Iteration in jeder Aufgabe. Der Wert von "Zwischensumme" in allen anderen Iterationen in dieser Aufgabe entspricht dem Ergebnis der vorherigen Iteration. –

+0

Ok, ich folge dir so weit, aber welcher Teil dieses Codes zeigt an, dass "() => 0" = Zwischensumme?Wie lautet die Konvention zwischen der init-Funktion und der (j, loop, subtotal) -Deklaration? Edit: Ich habe es gerade herausgefunden, der Teufel steckt in den Details, oder in diesem Fall, in den Namen der generischen Parameter der Körperfunktion System.Func . Danke, ich werde deine als Antwort markieren. –

6

als eine Erweiterung zu @Honza Brestan 's Antwort. Die Art und Weise, wie Parallel foreach die Arbeit in Aufgaben aufteilt, kann auch wichtig sein, es werden mehrere Schleifeniterationen in einer einzigen Aufgabe zusammengefasst, so dass in der Praxis localInit() einmal für jede n Iterationen der Schleife aufgerufen wird und mehrere Gruppen gleichzeitig gestartet werden können.

Der Punkt eines localInit und localFinally ist, um sicherzustellen, dass eine parallele foreach-Schleife ergibt sich aus jeder itteration zu einem einzigen Ergebnis kombinieren können, ohne dass Sie Lock-Anweisungen in der body, zu spezifizieren, um dies zu tun, müssen Sie eine Initialisierung für das zur Verfügung stellen Wert, den Sie erstellen möchten (localInit), dann kann jede bodyIntegration den lokalen Wert verarbeiten, dann stellen Sie eine Methode zum Kombinieren von Werten aus jeder Gruppe (localFinally) auf thread-sichere Weise bereit.

Wenn Sie localInit nicht zum Synchronisieren von Aufgaben benötigen, können Sie mit lambda-Methoden problemlos auf Werte aus dem umgebenden Kontext verweisen. Siehe Threading in C# (Parallel.For and Parallel.ForEach) für ein ausführlicheres Tutorial über die Verwendung von localInit/Endlich und scrollen Sie nach unten zu Optimierung mit lokalen Werten, Joseph Albahari ist wirklich meine Quelle für alle Dinge Threading.

+0

Abstimmen das, weil dies wirklich Dinge für mich geklärt hat. Vielen Dank. –

0

Von meiner Seite ein wenig einfaches Beispiel

class Program 
{ 
    class Person 
    { 
     public int Id { get; set; } 
     public string Name { get; set; } 
     public int Age { get; set; } 
    } 

    static List<Person> GetPerson() => new List<Person>() 
    { 
     new Person() { Id = 0, Name = "Artur", Age = 26 }, 
     new Person() { Id = 1, Name = "Edward", Age = 30 }, 
     new Person() { Id = 2, Name = "Krzysiek", Age = 67 }, 
     new Person() { Id = 3, Name = "Piotr", Age = 23 }, 
     new Person() { Id = 4, Name = "Adam", Age = 11 }, 
    }; 

    static void Main(string[] args) 
    { 
     List<Person> persons = GetPerson(); 
     int ageTotal = 0; 

     Parallel.ForEach 
     (
      persons, 
      () => 0, 
      (person, loopState, subtotal) => subtotal + person.Age, 
      (subtotal) => Interlocked.Add(ref ageTotal, subtotal) 
     ); 

     Console.WriteLine($"Age total: {ageTotal}"); 
     Console.ReadKey(); 
    } 
} 
Verwandte Themen