2

Ich beschreibe mein Problem in einem einfachen Beispiel und beschreibe dann ein engeres Problem.Parallele Ausführung von Aufgaben in Gruppen

Stellen Sie sich vor Wir haben n Elemente [i1, i2, i3, i4, ..., in] in der Box1 und wir haben eine Box2, die m Elemente verarbeiten kann (m ist normalerweise viel kleiner als n). Die für jedes Element erforderliche Zeit ist unterschiedlich. Ich möchte immer m Job Items machen, bis alle Items bearbeitet sind.

Ein viel engeres Problem ist, dass Sie zum Beispiel eine Liste1 von n Zeichenfolgen (URL-Adressen) von Dateien haben und wir ein System haben wollen, um m Dateien gleichzeitig herunterzuladen (zum Beispiel via httpclient.getAsync() Methode) . Immer wenn das Herunterladen eines der m Elemente beendet wird, muss ein anderes verbleibendes Element von list1 so schnell wie möglich ersetzt werden. Dies muss so lange fortgesetzt werden, bis alle Elemente von List1 ausgeführt wurden. (Anzahl der n und m werden von Benutzern zur Laufzeit angegeben)

Wie kann dies getan werden?

Antwort

1

Prozesselemente parallel die Anzahl der gleichzeitig Arbeitsplätze zu beschränken:

string[] strings = GetStrings(); // Items to process. 
const int m = 2; // Max simultaneous jobs. 

Parallel.ForEach(strings, new ParallelOptions {MaxDegreeOfParallelism = m}, s => 
{ 
    DoWork(s); 
}); 
+2

Seine DoWork async ist, und Parallel.ForEach unterstützt async nicht. –

+0

Diese Methode funktioniert nicht in meinem Problem. weil Sie Parallel.ForEach nicht mit asynchronen Methoden verwenden können. Im Fall der Verwendung von Parallel.ForEach mit asynchronen Methoden werden alle Aufgaben sofort ausgelöst (es wartet nicht bis zum Abschluss der asynchronen Aufgaben). Ich verwende HttpClient.getAsync, eine asynchrone Methode. –

6

Sie sollten die System.Threading.Tasks.Dataflow NuGet Paket zu einem Projekt suchen Sie dann in zu TPL Dataflow, fügen Sie, was Sie wollen so einfach wie

private static HttpClient _client = new HttpClient(); 
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                int concurrentDownloads) 
{ 
    var result = new List<MyClass>(); 

    var downloadData = new TransformBlock<string, string>(async uri => 
    { 
     return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 

    var processData = new TransformBlock<string, MyClass>(
      json => JsonConvert.DeserializeObject<MyClass>(json), 
      new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded}); 

    var collectData = new ActionBlock<MyClass>(
      data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time. 

    //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item. 
    downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true}); 
    processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true}); 

    //Load the data in to the first transform block to start off the process. 
    foreach (var uri in uris) 
    { 
     await downloadData.SendAsync(uri).ConfigureAwait(false); 
    } 
    downloadData.Complete(); //Signal you are done adding data. 

    //Wait for the last object to be added to the list. 
    await collectData.Completion.ConfigureAwait(false); 

    return result; 
} 

In dem obigen Code nur concurrentDownloads Anzahl von HttpClients wird zu jeder beliebigen Zeit aktiv sein, unbegrenzte Threads werden die empfangenen Strings verarbeiten und sie in Objekte konvertieren, und ein einzelner Thread wird thos nehmen Objekte und fügen Sie sie einer Liste hinzu.

UPDATE: hier ist ein vereinfachtes Beispiel, dass nur das tut, was sie in der Frage gestellt ist

private static HttpClient _client = new HttpClient(); 
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads) 
{ 
    var downloadData = new ActionBlock<string>(async uri => 
    { 
     var response = await _client.GetAsync(uri); //GetAsync is a thread safe method. 
     //do something with response here. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 


    foreach (var uri in uris) 
    { 
     downloadData.Post(uri); 
    } 
    downloadData.Complete(); 

    downloadData.Completion.Wait(); 
} 
+0

Danke, Bruder.Ich habe viel über TPL oder Reactive Extension bei der Suche nach einer Antwort für mein Problem gehört und gesehen, aber es ist so kompliziert für mich und ich verstehe nicht, wie man es benutzt. Gibt es dafür nicht eine einfachere Lösung? :) –

+0

Dataflow es ziemlich einfach zu tun, sobald Sie erkennen, dass Sie nur Schritte in einer Pipeline einrichten. Ich habe mein Beispiel zu kompliziert gemacht, damit ich Ihnen alle Funktionen von TPL DataFlow zeigen kann. Ich habe ein Beispiel für Ihre Anforderungen erstellt. –

+0

'HttpClient' ist für die Wiederverwendung für mehrere Anfragen vorgesehen, auch für gleichzeitige. Erstellen Sie eine Instanz und verwenden Sie diese, erstellen Sie nicht jedes Mal eine neue –

6

Hier ist eine generische Methode, die Sie verwenden können.

Wenn Sie dies aufrufen, wird TIn eine Zeichenfolge (URL-Adressen) sein und der asyncProcessor wird Ihre asynchrone Methode sein, die die URL-Adresse als Eingabe verwendet und eine Aufgabe zurückgibt.

Der von dieser Methode verwendete SlimSemaphore wird in Echtzeit nur eine Anzahl von gleichzeitigen asynchronen E/A-Anforderungen zulassen, sobald die andere Anforderung ausgeführt wird. So etwas wie ein gleitendes Fenstermuster.

public static Task ForEachAsync<TIn>(
      IEnumerable<TIn> inputEnumerable, 
      Func<TIn, Task> asyncProcessor, 
      int? maxDegreeOfParallelism = null) 
     { 
      int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism; 
      SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount); 

      IEnumerable<Task> tasks = inputEnumerable.Select(async input => 
      { 
       await throttler.WaitAsync().ConfigureAwait(false); 
       try 
       { 
        await asyncProcessor(input).ConfigureAwait(false); 
       } 
       finally 
       { 
        throttler.Release(); 
       } 
      }); 

      return Task.WhenAll(tasks); 
     } 
+0

Danke. Es scheint süß. Ich muss es testen und seine Funktionsweise melden. –

+0

Entschuldigung, ich habe eine Frage. Erstellt es alle Aufgaben sofort und wartet darauf, dass die Reihenfolge der einzelnen Aufgaben in der Zeitleiste angezeigt wird, oder erstellt es Aufgaben, wann immer es für ihn Zeit und notwendig ist? –

+0

'Task.WhenAll' erstellt intern eine Liste für alle Aufgaben, also denke ich, dass sie alle sofort erstellen werden. –

2

Eine einfache Lösung für die Drosselung ist ein SemaphoreSlim.
EDIT
Nach einer leichten Änderung erstellt der Code nun die Aufgaben, wenn sie benötigt werden

var client = new HttpClient(); 
SemaphoreSlim semaphore = new SemaphoreSlim(m, m); //set the max here 
var tasks = new List<Task>(); 

foreach(var url in urls) 
{ 
    // moving the wait here throttles the foreach loop 
    await semaphore.WaitAsync(); 
    tasks.Add(((Func<Task>)(async() => 
    { 
     //await semaphore.WaitAsync(); 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response 
     semaphore.Release(); 
    }))()); 
} 

await Task.WhenAll(tasks); 

Dies ist ein weiterer Weg, es zu tun

var client = new HttpClient(); 
var tasks = new HashSet<Task>(); 

foreach(var url in urls) 
{ 
    if(tasks.Count == m) 
    { 
     tasks.Remove(await Task.WhenAny(tasks));    
    } 

    tasks.Add(((Func<Task>)(async() => 
    { 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response    
    }))()); 
} 

await Task.WhenAll(tasks); 
+0

Es scheint den Job zu erledigen, es lädt sich simultan bei mot m urls herunter, hat aber ein Problem. Zum Beispiel, wenn Sie eine Liste von einer Million URLs haben, erstellt es eine Million Aufgaben in kurzer Zeit und wartet dann auf die Reihenfolge der einzelnen Aufgaben. Liege ich falsch? –

+0

Sie haben Recht, es schafft alle Aufgaben in kurzer Zeit. Es führt auch alles im selben Thread aus, aber Sie können dies mit 'ConfigureAwait (false)' ändern oder sie in einem Thread-Pool ausführen. Ich werde die Antwort mit einigen weiteren Informationen aktualisieren –

+0

Meine Liste der URLs kann so lang sein, kann Millionen sein, und wenn all diese Millionen Aufgabe erstellt werden, kann es dazu führen, dass nicht genügend Speicher, andere Ausnahmen oder Fehler :) Ich suche ein Lösung für jedes Teil, wann immer es erforderlich ist, mit wenig Speicherbedarf –

Verwandte Themen