2013-08-22 13 views
7

Also Thema sind die Fragen.Wie die AsParallel-Erweiterung funktioniert

ich diese Methode AsParallel Wrapper liefert ParallelQuery<TSource>, die die gleichen LINQ Schlüsselwörter verwendet, sondern von System.Linq.ParallelEnumerable statt System.Linq.Enumerable

klar Es ist genug, aber wenn ich in dekompilierten Quellen suchen, ich verstehe nicht, wie funktioniert Es klappt.

Beginnen wir mit einer einfachen Erweiterung: Sum() -Methode. Code:

[__DynamicallyInvokable] 
public static int Sum(this ParallelQuery<int> source) 
{ 
    if (source == null) 
    throw new ArgumentNullException("source"); 
    else 
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate(); 
} 

es klar ist, lassen Sie uns zu Aggregate() Methode. Es ist ein Wrapper für InternalAggregate-Methode, die einige Ausnahmen festhält. Sehen wir uns das jetzt an.

protected override int InternalAggregate(ref Exception singularExceptionToThrow) 
{ 
    using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true)) 
    { 
    int num = 0; 
    while (enumerator.MoveNext()) 
     checked { num += enumerator.Current; } 
    return num; 
    } 
} 

und hier ist die Frage: wie es funktioniert? Ich sehe keine Konkurrenzsicherheit für eine Variable, die von vielen Threads modifiziert wird, wir sehen nur Iterator und Summierung. Ist es ein magischer Enumerator? Oder wie funktioniert es? GetEnumerator() gibt QueryOpeningEnumerator<TOutput> zurück, aber der Code ist zu kompliziert.

Antwort

2

schließlich in meinem zweiten PLINQ Angriff ich eine Antwort gefunden. Und es ist ziemlich klar. Problem ist, dass Enumerator nicht einfach ist. Es ist eine spezielle multithreading eins. So wie es funktioniert? Antwort ist, dass enumerator keinen nächsten Wert der Quelle zurückgibt, es gibt eine ganze Summe der nächsten Partition zurück. Dieser Code wird also nur 2,4,6,8 ... mal (basierend auf Environment.ProcessorCount) ausgeführt, wenn die eigentliche Summierungsarbeit ausgeführt wird innerhalb enumerator.MoveNext in enumerator.OpenQuery Methode.

Also partitionieren TPL die Quelle aufzählbar, summieren dann unabhängig jede Partition und erstellen dann diese Summierung, siehe IntSumAggregationOperatorEnumerator<TKey>. Keine Magie hier, könnte einfach tiefer tauchen.

1

Der Operator Sum aggregiert alle Werte in einem einzigen Thread. Hier gibt es kein Multi-Threading. Der Trick ist, dass Multithreading irgendwo anders stattfindet.

Die PLINQ Sum-Methode kann PLINQ-Enumerables verarbeiten. Diese Aufzählungen können mithilfe anderer Konstrukte (z. B. where) erstellt werden, mit denen eine Sammlung über mehrere Threads verarbeitet werden kann.

Der Sum-Operator ist immer der letzte Operator in einer Kette. Obwohl es möglich ist, diese Summe über mehrere Threads zu verarbeiten, hat das TPL-Team wahrscheinlich herausgefunden, dass dies einen negativen Einfluss auf die Performance hatte, was vernünftig ist, da das einzige, was diese Methode zu tun hat, eine einfache Ganzzahladdition ist.

Diese Methode verarbeitet also alle Ergebnisse, die von anderen Threads verfügbar sind, verarbeitet sie in einem einzelnen Thread und gibt diesen Wert zurück. Der wahre Trick ist in anderen PLINQ-Erweiterungsmethoden.

+1

wenn "Es gibt hier kein Multithreading" Warum berechnet sum2 3 mal schneller als sum1? http://i.imgur.com/Z4CtyUz.png –

+0

@AlexJoukovsky: Ich testete dies und die 'AsParallel'-Version ist zwar schneller, aber ich habe keine Ahnung, wie das möglich ist, zumal die Eingabeaufzählung auch auf einem einzigen verarbeitet wird thread auf einmal. Bizarr. – Steven

+0

Partialsummen könnten parallel verarbeitet und dann kombiniert werden. – usr

-2
protected override int InternalAggregate(ref Exception singularExceptionToThrow) 
{ 
    using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions? (ParallelMergeOptions.FullyBuffered), true)) 
    { 
    int num = 0; 
    while (enumerator.MoveNext()) 
     checked { num += enumerator.Current; } 
    return num; 
    } 
} 

Dieser Code wird nicht parallel ausgeführt, die while wird sequentiell ausgeführt werden es ist innenscope.

diesen Versuchen Sie stattdessen

 List<int> list = new List<int>(); 

     int num = 0; 

     Parallel.ForEach(list, (item) => 
      { 
       checked { num += item; } 
      }); 

Die innere Aktion wird auf dem Threadpool und die foreach-Anweisung wird vollständig verteilt werden, wenn alle Elemente behandelt werden.

Hier müssen Sie threadsafety:

 List<int> list = new List<int>(); 

     int num = 0; 

     Parallel.ForEach(list, (item) => 
      { 
       Interlocked.Add(ref num, item); 
      }); 
+3

Ist nicht mein Code, es ist dekompilierte Quellen von .Net Framework 4.5, und es ** funktioniert ** –

+0

Natürlich funktioniert es, außer es ist nicht Multithread. Es wird in ein IEnumerable umgewandelt? _IntSumAggregationOperator ((IEnumerable ) source_ So wird immer noch sequentiell von der while ausgeführt.Ich glaube nicht, gibt es keinen Grund zu Summe Multithread, weil sie alle das Ergebnis beeinflussen –

+0

Abwesenheit von Boxen kann nicht Grund für 3x Perfomance Unterschied sein –