2016-12-21 1 views
2

Ich möchte eine Sammlung von io-gebundenen Jobs gleichzeitig verarbeiten, aber die Anzahl der ausstehenden (aktiv ausgeführten) gleichzeitigen Jobs begrenzen/begrenzen.gebundene/gleichzeitige Jobs drosseln, ohne einen Thread pro Job zu erstellen

Chunking ist eine einfache Möglichkeit, die Nebenläufigkeit zu erhöhen, erzeugt jedoch Engpässe, wenn die Elemente unterschiedliche Zeit benötigen.

Die Art, wie ich dies gefunden habe, hat einige Probleme 1). Gibt es einen Weg, die unten genannten Probleme zu vermeiden und dabei gleichzeitig idiomatisch und prägnant zu bleiben?

1) Verwenden Sie eine BlockingCollection (siehe unten). Dies führt jedoch zu einer Lösung, bei der die Nebenläufigkeit hier durch boundedSize Anzahl von "Consumer" -Threads erzeugt wird. Ich suche eine Lösung, die nicht boundedSize Anzahl der Threads benötigt, um boundedSize gleichzeitige Jobs zu erreichen. (Was ist, wenn boundedSize sehr groß ist?). Ich habe nicht gesehen, wie ich einen Gegenstand nehmen, verarbeiten und dann Signalvervollständigung. Ich kann nur Gegenstände nehmen ... und da ich nicht die ganze Liste auf einmal durchreißen möchte, muss der Konsument seine Arbeit synchron ausführen.

type JobNum = int 

let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) = 

    // create a BlockingCollection 
    use bc = new BlockingCollection<Async<Unit>>(boundedSize) 

    // put async jobs on BlockingCollection 
    Async.Start(async { 
     { start .. finish } 
     |> Seq.map mkJob 
     |> Seq.iter bc.Add 
     bc.CompleteAdding() 
    }) 

    // each consumer runs it's job synchronously 
    let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job } 

    // create `boundedSize` number of consumers in parallel 
    { 1 .. boundedSize } 
    |> Seq.map mkConsumer 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> ignore 

let Test() = 
    let boundedSize = 15 
    let start = 1 
    let finish = 50 
    let mkJob = (fun jobNum -> async { 
     printfn "%A STARTED" jobNum 
     do! Async.Sleep(5000) 
     printfn "%A COMPLETED" jobNum 
    }) 
    RunConcurrentlyBounded boundedSize start finish mkJob 

Ich bin mir dessen bewusst TPL und Mailbox-Prozessoren, aber dachte, dass es vielleicht etwas einfacher & robust waren, vermeidet aber die hohe Zahl der Thread-Erzeugung Route.

Idealerweise würde es nur einen Produzenten-Thread und einen Consumer-Thread geben; Ich vermute, dass BlockingCollection für einen solchen Fall nicht die richtige Parallelität primitive ist?

+0

Warum nicht TPL? Es ist ziemlich einfach zu benutzen. – Asti

Antwort

0

das scheint so gut wie ich werde, mit SemaphoreSlim.

Ich nehme an, der zugrundeliegende ThreadPool steuert hier wirklich die Nebenläufigkeit.

let RunConcurrentlySemaphore (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) = 

    use ss = new SemaphoreSlim(boundedSize); 

    { start .. finish } 
     |> Seq.map (mkJob >> fun job -> async { 
      do! Async.AwaitTask(ss.WaitAsync()) 
      try do! job finally ss.Release() |> ignore 
     }) 
     |> Async.Parallel 
     |> Async.RunSynchronously 
Verwandte Themen