2012-06-22 7 views
8

Kann jemand eine Erweiterungsfunktion schreiben, die eine ParallelQuery in PLINQ zurückgibt?Wie schreibe ich eine Thread-bewusste Erweiterungsfunktion für PLINQ?

Genauer gesagt habe ich das folgende Problem: Ich möchte eine Transformation innerhalb einer PLINQ-Abfrage durchführen, die eine Engine benötigt, deren Erstellung teuer ist und auf die nicht gleichzeitig zugegriffen werden kann.

konnte ich folgendes tun:

var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); }) 

Hier wird der Motor einmal pro Element erstellt, das zu teuer ist.

Ich möchte die Engine einmal erstellt pro Thread.

Mit Aggregate, kann ich nahe kommen, was ich mit etwas wollen wie

// helper class: engine to use plus list of results obtained in thread so far 
class EngineAndResults { 
    public Engine engine = null; 
    public IEnumerable<ResultType> results; 
} 

var result = source.AsParallel().Aggregate (

    // done once per block of items (=thread), 
    // returning an empty list, but a new engine 
    () => new EngineAndList() { 
     engine = new Engine(), 
     results = Enumerable.Empty<ResultType>() 
    }, 

    // we process a new item and put it to the thread-local list, 
    // preserving the engine for further use 
    (engineAndResults, item) => new EngineAndResults() { 
     engine = engineAndResults.engine, 
     results = Enumerable.Concat (
      engineAndResults.results, 
      new ResultType [] { engineAndResults.engine.Process (item) } 
     ) 
    }, 

    // tell linq how to aggregate across threads 
    (engineAndResults1, engineAndResults2) => new EngineAndResults() { 
     engine = engineAndResults1.engine, 
     results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results) 
    }, 

    // after all aggregations, how do we come to the result? 
    engineAndResults => engineAndResults.results 
); 

Wie Sie sehen können, mißbrauchen ich den Akkumulator einen Motor pro Thread zu tragen. Das Problem hierbei ist, dass PLINQ am Ende die Ergebnisse in einem einzigen IEnumerable aggregiert, wodurch die Threads synchronisiert werden. Das ist nicht sehr schön, wenn ich danach noch eine PLINQ-Erweiterung anhängen möchte.

würde ich so etwas wie

var result = source.AsParallel() 
        .SelectWithThreadwiseInitWhichIAmLookingFor (
         () => new Engine(), 
         (engine, item) => engine.Process (item) 
      ) 

Hat jemand eine Idee schätzen, wie dies zu erreichen?

Antwort

5

Sie könnten dazu ThreadLocal<T> verwenden. Etwas wie:

var engine = new ThreadLocal<Engine>(() => new Engine()); 
var result = source.AsParallel() 
        .Select(item => engine.Value.Process(item)); 
+0

Vielen Dank. Dies ist eine gute Lösung. Ich habe einen kurzen Test gemacht und es scheint gut zu funktionieren. Ich habe versucht, einen Weg zu finden, die Initialisierung in die Extension-Funktion zu bringen, aber es ist nicht gelungen - offensichtlich muss ThreadLocal erstellt werden, bevor AsParallel aufgerufen wird. Ich sehe den Grund dafür nicht, aber das ist kein großes Problem. – JohnB

+0

Ich denke, das hat nicht funktioniert, weil Sie für jede Iteration ein neues 'ThreadLocal' erstellt haben, so dass es keine Freigabe für Iterationen geben konnte, die auf demselben Thread ausgeführt werden. Alle Iterationen, die auf demselben Thread ausgeführt werden, benötigen die gleiche Instanz von 'ThreadLocal'. – svick

Verwandte Themen