2014-04-16 18 views
6

Ich habe einige hundert Dateien, die ich in Azure Blob Storage hochladen muss.
Ich möchte parallele Aufgabenbibliothek verwenden.
Aber anstatt alle 100 Threads zum Hochladen in eine foreach-Liste von Dateien auszuführen, wie kann ich die maximale Anzahl von Threads begrenzen, die es verwenden kann, und den Job parallel beenden. oder gleicht es die Dinge automatisch aus?Begrenzen der Anzahl der Threads in der Task Parallel Library

+1

Sie nicht verwenden sollten Fäden dafür überhaupt. Dafür gibt es eine 'Task'-basierte API, die natürlich asynchron ist: [CloudBlockBlob.UploadFromFileAsync] (http://msdn.microsoft.com/en-us/library/dn451828.aspx). Sind Sie auf VS2010 beschränkt und können 'async/await' nicht verwenden (also haben Sie die Frage mit" C# 4.0 "markiert)? – Noseratio

+0

Wenn ich mich richtig erinnere, verwendet es so viele Threads wie verfügbare Cores. Ich kann mich nicht erinnern, wo ich es gelesen habe. Könnte MS-Blog gewesen sein, oder eine Antwort auf SO, wenn ich mich gefragt habe, ob es notwendig ist. Sie könnten es einfach in einer Testanwendung tho mit einer Liste von 100 Ints versuchen, indem Sie Parallel verwenden. – Dbl

+1

@Noseratio nicht beschränkt auf VS2010 .. ich kann C# 5.0 auch verwenden .. lassen Sie mich als Tag .. – Seenu

Antwort

9

Sie sollten überhaupt keine Threads verwenden. Dafür gibt es eine Task -basierte API, die natürlich asynchron ist: CloudBlockBlob.UploadFromFileAsync. Verwenden Sie es mit async/await und SemaphoreSlim, um die Anzahl der parallelen Uploads zu drosseln.

Beispiel (nicht getestet):

const MAX_PARALLEL_UPLOADS = 5; 

async Task UploadFiles() 
{ 
    var files = new List<string>(); 
    // ... add files to the list 

    // init the blob block and 
    // upload files asynchronously 
    using (var blobBlock = new CloudBlockBlob(url, credentials)) 
    using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS)) 
    { 
     var tasks = files.Select(async(filename) => 
     { 
      await semaphore.WaitAsync(); 
      try 
      { 
       await blobBlock.UploadFromFileAsync(filename, FileMode.Create); 
      } 
      finally 
      { 
       semaphore.Release(); 
      } 
     }).ToArray(); 

     await Task.WhenAll(tasks); 
    } 
} 
2

Haben Sie versucht, MaxDegreeOfParallelism zu verwenden? Wie folgt aus:

System.Threading.Tasks.Parallel.Invoke(
new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 
0

Sie können dies herausfinden, indem Sie:

class Program 
{ 
    static void Main(string[] args) 
    { 
     var list = new List<int>(); 

     for (int i = 0; i < 100; i++) 
     { 
      list.Add(i); 
     } 

     var runningIndex = 0; 

     Task.Factory.StartNew(() => Action(ref runningIndex)); 

     Parallel.ForEach(list, i => 
     { 
      runningIndex ++; 
      Console.WriteLine(i); 
      Thread.Sleep(3000); 
     }); 

     Console.ReadKey(); 
    } 

    private static void Action(ref int number) 
    { 
     while (true) 
     { 
      Console.WriteLine("worked through {0}", number); 
      Thread.Sleep(2900); 
     } 
    } 
} 

Wie Sie die Anzahl der Parallelität sehen kann, ist zu Beginn kleiner, wird größer und wächst gegen Ende kleiner. Es gibt also eine Art automatische Optimierung.

0

im Wesentlichen eine Aktion oder Aufgabe für jede Datei zu laden, setzen Sie sie in einer Liste, wollen Sie gehen erstellen und dann diese Liste bearbeiten, um die Anzahl zu begrenzen das kann parallel verarbeitet werden.

My blog post zeigt, wie dies sowohl mit Aufgaben als auch mit Aktionen durchgeführt wird, und bietet ein Beispielprojekt, das Sie herunterladen und ausführen können, um beides in Aktion zu sehen.

Mit Aktionen

Bei der Verwendung von Aktionen können Sie die integrierten .NET Parallel.Invoke-Funktion verwenden. Hier beschränken wir uns darauf, maximal 5 Threads parallel zu laufen.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Diese Option nicht, obwohl die Verwendung des async Natur UploadFromFileAsync machen, so dass Sie möglicherweise den Task-Beispiel unten verwenden möchten.

Mit Aufgaben

Mit Aufgaben gibt es keine integrierte Funktion. Sie können jedoch das verwenden, das ich in meinem Blog zur Verfügung stelle.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

Und dann die Liste der Aufgaben Erstellen und Aufrufen der Funktion, sie laufen zu lassen, mit maximal 5 gleichzeitige sagen zu einer Zeit, können Sie dies tun:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5); 
Verwandte Themen