2017-02-25 1 views
1

Ich versuche, 5000 Dateien asynchron zu verarbeiten, ohne den Threadpool uneingeschränkt wachsen. Die Parallel.For Schleife gibt mir jedoch keine konsistent korrekte Antwort (Anzahl kommt zu kurz), während die Task.Run ist.Parallel.For vs ThreadPool und async/erwarten

Was mache ich falsch in der Parallel.For Schleife, die diese falschen Antworten verursacht?

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

class Program 
{ 
    static volatile int count = 0; 
    static volatile int total = 0; 
    static void Main(string[] args) 
    { 
     Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 10 }, 
      async (index) => 
      { 
       string filePath = $"C:\\temp\\files\\out{index}.txt"; 
       var bytes = await ReadFileAsync(filePath); 
       Interlocked.Add(ref total, bytes.Length); 
       Interlocked.Increment(ref count); 
      }); 
     Console.WriteLine(count); 
     Console.WriteLine(total); 

     count = 0; 
     total = 0; 
     List<Task> tasks = new List<Task>(); 
     foreach (int index in Enumerable.Range(0, 5000)) 
     { 
      tasks.Add(Task.Run(async() => 
      { 
       string filePath = $"C:\\temp\\files\\out{index}.txt"; 
       var bytes = await ReadFileAsync(filePath); 
       Interlocked.Add(ref total, bytes.Length); 
       Interlocked.Increment(ref count); 
      })); 
     } 
     Task.WhenAll(tasks).Wait(); 
     Console.WriteLine(count); 
     Console.WriteLine(total); 
    } 
    public static async Task<byte[]> ReadFileAsync(string filePath) 
    { 
     byte[] bytes = new byte[4096]; 
     using (var sourceStream = new FileStream(filePath, 
       FileMode.Open, FileAccess.Read, FileShare.Read, 
       bufferSize: 4096, useAsync: true)) 
     { 
      await sourceStream.ReadAsync(bytes, 0, 4096); 
     }; 
     return bytes; 
    } 
} 
+0

Können Sie das konkretisieren über Ihre gewünschten Ergebnisse sein. Was meinst du mit _growing the thread pool unrestricted_ In deinem zweiten Beispiel hast du 5000 Aufgaben in der Warteschlange, aber es bedeutet nicht, dass sie fertig sind. In jedem Fall wird der flüchtige Stoff nur ohne eine Verriegelung benötigt. Welche Werte von "count" erwarten Sie in beiden Fällen? – JSteward

+0

Es scheint mir, dass das Lesen der gesamten Datei, nur um ihre Länge zu erhalten, sehr ineffizient ist. 'neue FileInfo (somePath) .Length' wäre viel effizienter (wenn auch nicht asynchron ... aber das Setup für' new FileStream' macht auch eine Menge synchrones Zeug einschließlich der Länge, so dass es kein Verlust ist) – spender

+1

Don ' t mix 'Parallel.For/ForEach' mit' async/await' – MickyD

Antwort

9

Parallel.For ist nicht async bewusst.

Als solche funktioniert die Parallel.For nicht wie erwartet. Da auf die vom asynchronen Lambda erzeugte Task nicht gewartet wird, werden alle Iterationen in der Zeit abgeschlossen, die erforderlich ist, um die Tasks zu erstellen und nicht zu vervollständigen.

nach dem Parallel.For, eine Anzahl von Iterationen wird immer noch eine anhängige Aufgabe hat, die noch nicht abgeschlossen ist, und daher Ihre Ergänzungen count und total noch nicht abgeschlossen.

Stephen Toub hat eine asynchrone Version von Parallel.ForEach implementiert. (ForEachAsync) Die Umsetzung wird wie folgt:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll(
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current); 
     })); 
} 

So könnten Sie Ihre Schleife umschreiben:

Enumerable.Range(0, 5000).ForEachAsync(10, async (index)=>{ 
    //$$$ 
}); 
+0

Interessant, ich hatte keine Ahnung, dass Sie Parallel.Für Async nicht kombinieren können. 'Da auf die vom asynchronen Lambda erzeugte Task nicht gewartet wird, werden alle Iterationen in der Zeit abgeschlossen, die zum Erstellen der Tasks benötigt wird, und nicht zum Abschluss. 'Wartet Task.WhenAll jedoch nicht auf den Abschluss aller Tasks ? – MvdD

+0

@MvdD Es tut es, obwohl es nur auf den ersten "Ausgang" von 'async' Methoden wartet. "Zurück" zurück zum "Async" kann nicht passieren. – VMAtm