2013-12-20 12 views
6

Sagen, ich habe 10 Fäden beschäftigt, etwas zu tun, und sie rufen manchmal eine MethodeVersand Arbeit von vielen Threads zu einem synchronen Thread

public HandleWidgets(Widget w) { HeavyLifting(w) } 

Aber ich meine nicht 10 Threads wollen auf HeavyLifting warten (w) , aber senden Sie stattdessen die Arbeit HeavyLifting (w) an einen 11. Thread, den HeavyLifter-Thread, und fahren Sie asynchron fort. Der HeavyLifter-Thread, an den abgesetzt wird, sollte immer derselbe Thread sein, und ich möchte nicht mehrere Threads erstellen (daher kann ich etwas ganz ähnliches nicht tun: C# Asynchronous call without EndInvoke?).

HeavyLifting (w) ist "Feuer und vergessen", dass die Threads, die HandleWidgets() aufrufen, keinen Rückruf oder ähnliches benötigen.

Was ist eine gesunde Praxis dafür?

+3

Ihr Thread-Setup ist seltsam. Statt das "Heavy Lifting" zu mehreren Threads zu parzellieren, parzellieren Sie das Parzellieren zu mehreren Threads und dann einen einzelnen Thread mit der eigentlichen Arbeit. Dies ist das Gegenteil von dem, was Sie generell wollen, wenn Sie versuchen, Gewinne aus mehreren Kernen zu erzielen. – dlev

+0

Ich habe deinen Titel bearbeitet. Bitte lesen Sie "[Sollten die Fragen" Tags "in ihren Titeln enthalten?] (Http://meta.stackexchange.com/questions/19190/)", wobei der Konsens "nein, sie sollten nicht" lautet. –

+0

Hallo Drev, es ist ungewöhnlich. Meine reale Anwendung besteht darin, dass die zehn Threads eine Schnittstelle zu externer Hardware bilden und der elfte Thread ein Problem löst, das sich auf Daten bezieht, die von der Hardware gesammelt werden. Ich sollte nicht mehr als das sagen;) – John

Antwort

1

Grundsätzlich haben Sie Threads, die Produzenten der Arbeit und ein Thread gibt, die ein Verbraucher davon.

Erstellen Sie einen Thread und haben Sie Take von einem BlockingCollection in einer Schleife. Dies ist Ihr Consumer Thread, der HeavyLifting aufrufen wird. Es wird einfach warten, bis ein Artikel verfügbar ist, und ihn dann verarbeiten:

Ein Aufruf von Take kann blockieren, bis ein Artikel verfügbar ist, der entfernt werden soll.

Die anderen Threads können einfach Elemente zur Sammlung hinzufügen.

Beachten Sie, dass BlockingCollection bieten keine Garantie Reihenfolge des Postens hinzugefügt/von selbst entfernt:

Die Reihenfolge, in der ein Element entfernt ist von der Art der Sammlung hängt verwendet, um die Blocking Instanz zu erstellen. Wenn Sie ein Blocking Objekt erstellen, können Sie die Art der Sammlung angeben verwenden

1

Erstellen Sie eine Warteschlange, die unter allen Threads und einem Semaphor geteilt wird, die auch unter allen Threads geteilt wird. Arbeiter-Threads (das sollte nicht für HeavyLifter WAIT) POST-Anforderungen wie folgt aus:

lock (queue) 
{ 
    queue.Enqueue(request); 
    semaphore.Release(); 
} 

HeavyLifter ist ein Hintergrund-Thread (stoppt nicht Prozess vom Verlassen) und führt den folgenden Code in Endlosschleife:

while (true) 
{ 
    semaphore.WaitOne(); 
    Request item = null 
    lock (queue) 
    { 
     item = queue.Dequeue(); 
    } 
    this.ProcessRequest(item); 
} 

EDIT: Tippfehler.

1

Sie können eine begrenzte Parallelität erstellen TaskScheduler als Task Fabrik verwendet werden, wie in this example from MSDN, mit der Beschränkung auf Einzelgewinde versehen:

var lcts = new LimitedConcurrencyLevelTaskScheduler(1); 
TaskFactory factory = new TaskFactory(lcts); 

Die Ihre Funktion implementieren, wie:

public HandleWidgets(Widget w) 
{ 
    factory.StartNew(() => HeavyLifting(w)); 
} 
1

--- EDIT ---

Ich habe gerade festgestellt, Sie brauchen "Feuer und vergessen ", in diesem Fall würde eine blockierende Sammlung allein ausreichen. Die Lösung unten ist wirklich für komplexere Szenarien, in denen Sie benötigen, um ein Ergebnis zurückzukehren, oder eine Ausnahme zu propagieren, oder zu komponieren Aufgaben in irgendeiner Art und Weise (zB über async/await) etc ...


Verwenden TaskCompletionSource aussetzen die Arbeit in der "synchronen" Thread als Task-basierte API zu den "Client" -Threads.

Für jeden Aufruf von HandleWidgets (CT = "Client-Thread", "ST" = synchrones Gewinde):

  • CT: Erstellen eines separaten TaskCompletionSource.
  • CT: Versand HeavyLifting an die ST (wahrscheinlich durch eine BlockingCollection; auch die TaskCompletionSource zu ihm, so kann es den letzten Schritt unten tun).
  • CT: Return TaskCompletionSource 's Task an den Aufrufer ohne warten auf die Arbeit am ST zu beenden.
  • CT: normal weiterfahren. Wenn/wenn es nicht mehr möglich ist, fortzufahren, ohne auf HeavyLifting zu warten (im ST), warten Sie auf die obige Aufgabe.
  • ST: Wenn HeavyLifting beendet ist, rufen SetResult (oder SetException oder SetCanceled, je nach Bedarf), die alle CTs deblockiert, die derzeit auf die Aufgabe warten könnten.
2

ich überrascht, keiner der anderen Antworten bin TPL DataFlow hier erwähnen. Sie verbinden eine Reihe von Blöcken miteinander und veröffentlichen Daten über die Kette. Sie können die Parallelität von jedem Block explizit steuern, so dass Sie Folgendes tun könnten:

var transformBlk = 
    new TransformBlock<int,int>(async i => { 
     await Task.Delay(i); 
     return i * 10; 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 10}); 

var processorBlk= 
    new ActionBlock<int>(async i => { 
     Console.WriteLine(i); 
    },new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 1}); 

transformBlk.LinkTo(processorBlk); 
var data = Enumerable.Range(1, 20).Select(x => x * 1000); 
foreach(var x in data) 
{ 
    transformBlk.Post(x); 
} 
+0

Ich denke, ich könnte dieses NuGet-Paket in einigen meiner Projekte installieren ... –

Verwandte Themen