2017-05-01 1 views
3

Ich habe eine einzige Methode, die einen Dateinamen eines Bildes und verarbeitet das Bild (CPU-intensiv), dann lädt es in BLOB-Speicher (async IO). Hier ist eine Zusammenfassung der Methode:Welcher dieser Ansätze zur Parallelisierung einer asynchronen Methode ist am besten geeignet?

public async Task<ImageJob> ProcessImage(String fileName) { 

    Byte[] imageBytes = await ReadFileFromDisk(fileName).ConfigureAwait(false); // IO-bound 

    Byte[] processedImage = RunFancyAlgorithm(imageBytes); // CPU-bound 

    Uri blobUri = await this.azureBlobClient.UploadBlob(processedImage).ConfigureAwait(false); // IO-bound 

    return new ImageJob(blobUri); 
} 

Der andere Teil meines Programms erhält eine Liste von Tausenden von Dateinamen verarbeitet werden.

Wie kann ich meine Methode ProcessImage so aufrufen, dass die verfügbare IO- und CPU-Leistung maximal genutzt wird?

Ich habe sechs verschiedene Arten (bisher) identifiziert meine Methode zum Aufruf - aber ich bin nicht sicher, was das Beste ist:

String[] fileNames = GetFileNames(); // typically contains thousands of filenames 

// Approach 1: 
{ 
    List<Task> tasks = fileNames 
     .Select(fileName => ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 2: 
{ 
    List<Task> tasks = fileNames 
     .Select(async fileName => await ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 3: 
{ 
    List<Task> tasks = new List<Task>(); 
    foreach(String fileName in fileNames) 
    { 
     Task imageTask = ProcessImage(fileName); 
     tasks.Add(imageTask); 
    } 

    await Task.WhenAll(tasks); 
} 

// Approach 4 (Weirdly, this gives me this warning: CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call." 
// ...even though I don't use an async lambda in the previous 3 examples, why is Parallel.ForEach so special? 
{ 
    ParallelLoopResult parallelResult = Parallel.ForEach(fileNames, fileName => ProcessImage(fileName)); 
} 

// Approach 5: 
{ 
    ParallelLoopResult parallelResult = Parallel.ForEach(fileNames, async fileName => await ProcessImage(fileName)); 
} 

// Approach 6: 
{ 
    List<Task> tasks = fileNames 
     .AsParallel() 
     .Select(fileName => ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 

// Approach 7: 
{ 
    List<Task> tasks = fileNames 
     .AsParallel() 
     .Select(async fileName => await ProcessImage(fileName)) 
     .ToList(); 

    await Task.WhenAll(tasks); 
} 
+0

je mischen async/erwarten und und 'Parallel.ForEach' sie sind unvereinbar, Es bewirkt, dass Sie eine Async-Void-Funktion generieren. Sie müssen [TPL stattdessen] (https://msdn.microsoft.com/en-us/library/dd460717 (v = vs.110) .aspx) –

+0

für Ihre Warnmeldung (Ansatz 4) verwenden. Hier finden Sie eine Erklärung: https://blogs.msdn.microsoft.com/ericlippert/2010/11/11/asynchronyin-in-c-5-part-six-whither-async/. Kurz gesagt, das 'async' Schlüsselwort aktiviert das' await' Schlüsselwort, dh wenn Sie das 'await' Schlüsselwort verwenden möchten, muss Ihre Methode' async' sein, das bedeutet nicht, dass Sie 'wait' nicht verwenden müssen, wenn Ihre Methode ist nicht "asynchron". –

+0

@KhanhTO Ich weiß das - aber aus irgendeinem Grund warnt der Compiler mich in Ansatz 4, warnt mich aber nicht in Ansatz 1, obwohl das Lambda identisch ist. – Dai

Antwort

3

Es klingt wie Sie haben viele Einzelteile, die verarbeitet werden müssen genau so. Als @StephenCleary erwähnt TPL Dataflow ist großartig für die Art des Problems. Ein großes Intro finden Sie here. Der einfachste Weg wäre für den Anfang nur ein paar Blöcke mit Ihrem Haupt TransformBlock Ausführung ProcessImage Hier ist ein einfaches Beispiel für den Einstieg:

public class ImageProcessor { 

    private TransformBlock<string, ImageJob> imageProcessor; 
    private ActionBlock<ImageJob> handleResults; 

    public ImageProcessor() { 
     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 1000, 
      MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     imageProcessor = new TransformBlock<string, ImageJob>(fileName => ProcessImage(fileName), options); 
     handleResults = new ActionBlock<ImageJob>(job => HandleResults(job), options); 
     imageProcessor.LinkTo(handleResults, new DataflowLinkOptions() { PropagateCompletion = true });   
    } 

    public async Task RunData() { 
     var fileNames = GetFileNames(); 
     foreach (var fileName in fileNames) { 
      await imageProcessor.SendAsync(fileName); 
     } 
     //all data passed into pipeline 
     imageProcessor.Complete(); 
     await imageProcessor.Completion; 
    } 

    private async Task<ImageJob> ProcessImage(string fileName) { 
     //Each of these steps could also be separated into discrete blocks 

     var imageBytes = await ReadFileFromDisk(fileName).ConfigureAwait(false); // IO-bound 

     var processedImage = RunFancyAlgorithm(imageBytes); // CPU-bound 

     var blobUri = await this.azureBlobClient.UploadBlob(processedImage).ConfigureAwait(false); // IO-bound 

     return new ImageJob(blobUri); 
    } 

    private void HandleResults(ImageJob job) { 
     //do something with results 
    } 
} 
Verwandte Themen