Ich möchte eine Sammlung von io-gebundenen Jobs gleichzeitig verarbeiten, aber die Anzahl der ausstehenden (aktiv ausgeführten) gleichzeitigen Jobs begrenzen/begrenzen.gebundene/gleichzeitige Jobs drosseln, ohne einen Thread pro Job zu erstellen
Chunking ist eine einfache Möglichkeit, die Nebenläufigkeit zu erhöhen, erzeugt jedoch Engpässe, wenn die Elemente unterschiedliche Zeit benötigen.
Die Art, wie ich dies gefunden habe, hat einige Probleme 1)
. Gibt es einen Weg, die unten genannten Probleme zu vermeiden und dabei gleichzeitig idiomatisch und prägnant zu bleiben?
1)
Verwenden Sie eine BlockingCollection (siehe unten). Dies führt jedoch zu einer Lösung, bei der die Nebenläufigkeit hier durch boundedSize
Anzahl von "Consumer" -Threads erzeugt wird. Ich suche eine Lösung, die nicht boundedSize
Anzahl der Threads benötigt, um boundedSize
gleichzeitige Jobs zu erreichen. (Was ist, wenn boundedSize
sehr groß ist?). Ich habe nicht gesehen, wie ich einen Gegenstand nehmen, verarbeiten und dann Signalvervollständigung. Ich kann nur Gegenstände nehmen ... und da ich nicht die ganze Liste auf einmal durchreißen möchte, muss der Konsument seine Arbeit synchron ausführen.
type JobNum = int
let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) =
// create a BlockingCollection
use bc = new BlockingCollection<Async<Unit>>(boundedSize)
// put async jobs on BlockingCollection
Async.Start(async {
{ start .. finish }
|> Seq.map mkJob
|> Seq.iter bc.Add
bc.CompleteAdding()
})
// each consumer runs it's job synchronously
let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job }
// create `boundedSize` number of consumers in parallel
{ 1 .. boundedSize }
|> Seq.map mkConsumer
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
let Test() =
let boundedSize = 15
let start = 1
let finish = 50
let mkJob = (fun jobNum -> async {
printfn "%A STARTED" jobNum
do! Async.Sleep(5000)
printfn "%A COMPLETED" jobNum
})
RunConcurrentlyBounded boundedSize start finish mkJob
Ich bin mir dessen bewusst TPL und Mailbox-Prozessoren, aber dachte, dass es vielleicht etwas einfacher & robust waren, vermeidet aber die hohe Zahl der Thread-Erzeugung Route.
Idealerweise würde es nur einen Produzenten-Thread und einen Consumer-Thread geben; Ich vermute, dass BlockingCollection für einen solchen Fall nicht die richtige Parallelität primitive ist?
Warum nicht TPL? Es ist ziemlich einfach zu benutzen. – Asti