2011-01-06 25 views
3

Ich möchte eine parallele Pipeline in C# erstellen. Ich habe eine Schnittstelle namens IOperation declaered:Parallele Pipeline in C#

public interface IOperation<Tin, Tout> 
{ 
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input); 
} 

Jetzt will ich eine Klasse schreiben, die mehrere dieser Operationen parallel ausführt. ich mit diesem Pagan:

public class Pipeline : IPipeline 
{ 
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>(); 
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>(); 
    public void Register(IOperation<Object, Object> operation) 
    { 
     operations.Add(operation); 
    } 

    public void Execute() 
    { 

    } 
} 

aber ich finde keine Lösung, um die Operationen und die Puffer zwischen den Operationen zu sparen, weil sie alle verschiedene generische Typen haben. Hat jemand eine Idee?

+1

Könnten Sie klarstellen, was Sie mit "speichern" meinen? –

+0

Ich möchte sie in den beiden Listen am Anfang der Pipeline-Klasse haben – AntonS

Antwort

1

Haben Sie in Betracht gezogen, Parallel.ForEach von der TPL zu verwenden?
Die Task Parallel Library (TPL) ist eine Gruppe öffentlicher Typen und APIs in .NET 4.

+0

Kann ich eine foreach-Schleife, die auf neue Eingaben wartet, wenn nichts mehr in der Liste ist? – AntonS

+1

Ich nehme es an. Verwenden Sie statt einer List eine Methode, die ein IEnumerable zurückgibt. Wenn in diesem Verfahren Daten verfügbar sind, dann "gebe die Daten zurück", ansonsten warte auf ein Synchronisationssignal (z. B. wenn ein Monitor gepulst ist). – ZunTzu

1

Es ist nicht sehr klar, wie Ihre Pipeline arbeiten soll. Warum vertreibst du BlockingCollections? Warum verwenden Sie Generika, aber setzen Sie object in den Typ?

Betrachten Sie stattdessen eine Pipeline, die Sie mit deleggates des Typs Action laden, und verwenden Sie dann die parallele Taskbibliothek, um Tasks zu erstellen, die diese Aktionen parallel ausführen.

public void Register(Action operation) 
    { 
     operations.Add(operation); 
    } 

public void Execute() 
    { 
     foreach (var action in operations) 
      Task.StartNew(operation); 
    } 

Aber das ist nicht wirklich eine "Pipeline", es ist nur ein Bündel von Operationen, die parallel ausgeführt werden.

Eine Pipeline hätte normalerweise Pipeline-Schritte mit einem Eingabetyp und einem Ausgabetyp. Sie könnten damit umgehen, indem Sie etwas wie PipelineStep<T,U> erstellen, und Sie würden jeden Pipeline-Schritt konstruieren, der in einer Func-Operation übergeben wird. Intern könnte jeder Pipelineschritt einen IEnumerable-Eingang konsumieren und einen IEnumerable-Ausgang erzeugen, und zwar mithilfe von Task oder einfacher mit einer parallelen foreach-Schleife.

Alternativ könnten Sie auch die TPL Task.ContinueWith Methode verwenden, um die Tasks von Eingang zu Ausgang zu verketten.

0

Es gibt einen guten Artikel unter http://msdn.microsoft.com/en-us/library/ff963548.aspx über parallele Pipelines mit BlockingCollection.

Grundsätzlich sollte jeder Schritt eine Ausgabewarteschlange vom Typ BlockingCollection haben. Er nimmt Elemente aus der Ausgabewarteschlange des vorherigen Schritts auf und fügt sie nach der Verarbeitung zur Ausgabe hinzu.

1

Microsoft hat etwas genau so - TPL Dataflow können Sie Blöcke in einer Pipeline definieren, mit fein abgestimmten Steuerelementen, wie sie gepuffert und parallelisiert werden.

Im Gegensatz zu Ihrer Lösung wird ein vollständig asynchrones Push-Design verwendet. Es verwendet kein BlockingCollection (ein blockierendes Pull-Design), und es wird wesentlich schneller sein, wenn Sie eine tiefe Pipeline haben.