2015-11-25 8 views
17

richtig in Warteschlange Ich habe eine Aufzählung von Elementen (RunData.Demand), die jeweils einige Arbeit darstellen Aufruf einer API über HTTP. Es funktioniert gut, wenn ich nur foreach über alles und rufen Sie die API während jeder Iteration. Allerdings dauert jede Iteration ein oder zwei Sekunden, also würde ich gerne 2-3 Threads ausführen und die Arbeit zwischen ihnen aufteilen. Hier ist, was ich tue:Wie ordne ich Aufgaben in C#

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads 
var tasks = RunData.Demand 
    .Select(service => Task.Run(async delegate 
    { 
     var availabilityResponse = await client.QueryAvailability(service); 
     // Do some other stuff, not really important 
    })); 

await Task.WhenAll(tasks); 

Der client.QueryAvailability Aufruf ruft im Grunde eine API mit der HttpClient Klasse:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request) 
{ 
    var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request); 

    if (response.IsSuccessStatusCode) 
    { 
     return await response.Content.ReadAsAsync<QueryAvailabilityResponse>(); 
    } 

    throw new HttpException((int) response.StatusCode, response.ReasonPhrase); 
} 

Dies funktioniert gut für eine Weile, aber schließlich die Dinge beginnen Timing aus. Wenn ich das HttpClient Timeout auf eine Stunde setze, bekomme ich seltsame interne Serverfehler.

Was ich begann zu tun war eine Stoppuhr innerhalb der QueryAvailability Methode, um zu sehen, was los war.

Was passiert, ist alle 1200 Elemente in RunData.Demand werden auf einmal erstellt und alle 1200 await client.PostAsJsonAsync Methoden aufgerufen werden. Es scheint, dass es dann die 2 Threads verwendet, um die Aufgaben langsam zu überprüfen, so dass ich zum Ende Aufgaben habe, die auf 9 oder 10 Minuten gewartet haben.

Hier ist das Verhalten Ich mag würde:

Ich mag würde die 1200 Aufgaben erstellen und sie dann 3-4 zu einem Zeitpunkt ausgeführt als Threads verfügbar sind. Ich mache nicht wollen 1.200 HTTP-Anrufe sofort anstehen.

Gibt es einen guten Weg, dies zu tun?

+0

Sie scheinen keinen neuen "Client" für jeden Anruf zu erstellen. Sie wissen, dass 'System.Net.Http.HttpClient' für Instanzaufrufe nicht Thread-sicher ist? Für jeden Aufruf sollte eine neue Instanz erstellt (und danach angeordnet) werden. – Enigmativity

+0

Die QueryAvailability-Methode befindet sich tatsächlich in einer Klasse, die den HttpClient erstellt, der ein privates Mitglied dieser Instanz ist. Ich wusste nicht, dass es nicht Thread-sicher ist, aber ich könnte es definitiv vor jedem Anruf erstellen. Ich schaue mir das mehr an, danke! –

+0

Hmm, ich habe ein bisschen recherchiert und es scheint, dass ich threadsicher bin. Siehe [hier] (http://stackoverflow.com/questions/11178220/is-httpclient-safe-to-use-concurrently) und [hier] (http://www.tomdupont.net/2014/11/net- 45-httpclient-is-thread-safe.html) –

Antwort

17

Wie ich immer empfehlen .. was Sie brauchen, ist TPL Dataflow (zu installieren: Install-Package Microsoft.Tpl.Dataflow).

Sie erstellen eine ActionBlock mit einer Aktion für jedes Element auszuführen. Stellen Sie MaxDegreeOfParallelism für die Drosselung ein. Starten Sie in sie zu veröffentlichen und warten auf ihre Fertigstellung:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{ 
    var availabilityResponse = await client.QueryAvailability(service); 
    // ... 
}, 
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 

foreach (var service in RunData.Demand) 
{ 
    block.Post(service); 
} 

block.Complete(); 
await block.Completion; 
+0

Das sieht auch vielversprechend aus und sieht so aus, als ob es gut mit 'async' Methoden funktioniert (zB kann ich' '' auf Block-Vervollständigung erwarten) .. Ich gebe das ein Schuss. –

+0

@MikeChristensen ist es. Es ist eine der wenigen Bibliotheken in .Net, die speziell mit async geschrieben wurden. – i3arnon

+2

Funktioniert einwandfrei! Ich bin jetzt ein Fan. Akzeptieren. –

3

Sie sind Asynchron-HTTP-Aufrufe verwenden, so die Anzahl der Threads Begrenzung nicht helfen (noch wird ParallelOptions.MaxDegreeOfParallelism in Parallel.ForEach als eine der Antworten schon sagt). Selbst ein einzelner Thread kann alle Anfragen initiieren und die Ergebnisse so verarbeiten, wie sie ankommen.

Eine Möglichkeit, es zu lösen, ist die Verwendung von TPL Dataflow.

Eine weitere schöne Lösung ist die Quelle IEnumerable in Partitionen und Prozesselemente in jeder Partition nacheinander zu unterteilen, wie in this blog post beschrieben:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll(
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate 
     { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current); 
     })); 
} 
3

Während die Datenfluß-Bibliothek ist sehr gut, ich denke, es ist ein bisschen schwer ist, wenn sie nicht mit Blockzusammensetzung. Ich würde dazu neigen, etwas wie die Verlängerungsmethode unten zu verwenden. Im Gegensatz zur Partitioner-Methode führt dies die asynchronen Methoden im aufrufenden Kontext aus. Der Cloat ist, dass, wenn Ihr Code nicht wirklich asynchron ist oder einen "schnellen Pfad" verwendet, er synchron läuft, da keine Threads vorhanden sind werden explizit erstellt.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel) 
{ 
    var tasks = new List<Task>(); 

    foreach (var item in items) 
    { 
     tasks.Add(asyncAction(item)); 

     if (tasks.Count < maxParallel) 
       continue; 

     var notCompleted = tasks.Where(t => !t.IsCompleted).ToList(); 

     if (notCompleted.Count >= maxParallel) 
      await Task.WhenAny(notCompleted); 
    } 

    await Task.WhenAll(tasks); 
} 
3

alte Frage, aber ich möchte eine alternative leichte Lösung mit der SemaphoreSlim Klasse vorzuschlagen. Referenzieren Sie einfach System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4); 

foreach (var service in RunData.Demand) 
{ 

    await sem.WaitAsync(); 
    Task t = Task.Run(async() => 
    { 
     var availabilityResponse = await client.QueryAvailability(serviceCopy));  
     // do your other stuff here with the result of QueryAvailability 
    } 
    t.ContinueWith(sem.Release()); 
} 

Der Semaphor fungiert als Verriegelungsmechanismus. Sie können das Semaphor nur aufrufen, indem Sie Wait (WaitAsync) aufrufen, das eins von der Zählung subtrahiert. Anruferklärung fügt eins zur Zählung hinzu.

+1

Wenn ich richtig verstehe, je nachdem, wie man dies verwendet, könnte gut mit 'sem.Wait()' anstelle von 'sem.WaitASync()' warten. Dies würde den aufrufenden Thread blockieren. Dies sollte nicht im UI-Thread, sondern in jedem anderen Thread erfolgen. Dies ist möglicherweise der einfachste Weg, um die auszuführende Arbeit zu verwalten. Genauer gesagt, wenn eines der 4 Sems verfügbar ist, wird es sofort fortgesetzt. Wenn nicht, wartet es bis einer verfügbar ist. – ToolmakerSteve

Verwandte Themen