EDIT: Ich missverstanden, was Sie gefragt haben; Sie können dies in einer Pipe tun, aber ich bin mir nicht sicher, was die Motivation wäre. Ich würde empfehlen, wiederverwendbare Rohrketten zu bauen und sie nur mit Hilfe von Arbeitern zu versenden, anstatt zu versuchen, Arbeiter innerhalb der Rohrleitung zu bauen. Sie verlieren alle Bestellgarantien, dass der erste Eingang der erste ist, wenn Sie ihn in die Leitung selbst einbauen.
Der Abschnitt auf Work Stealing ist, was Sie suchen, dieser Code ist im Grunde wörtlich aus dem Tutorial, aber lasst uns zusammenbrechen, wie es funktioniert. Hier ist ein Weg, könnten wir das tun, was Sie wollen:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO()
a = each [1..10]
b :: Pipe Int Int IO()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO()
c = do
x <- await
lift $ print x
c
main :: IO()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
Die erste Zeile spawn unbounded
von Pipes.Concurrent ist es initialisiert eine ‚Mailbox‘, die einen Griff für Ein- und Ausgang hat. Es verwirrte mich zuerst, aber in diesem Fall senden wir Nachrichten an den Ausgang und ziehen sie von der Eingabe. Dies ähnelt einem Push-Pull-Nachrichtenkanal in Sprachen wie Golang.
Wir geben eine Buffer um zu sagen, wie viele Nachrichten wir speichern können, in diesem Fall setzen wir No-Limit mit unbegrenzt.
Okay, damit das Postfach initialisiert wird, können wir jetzt Effect
s erstellen, die Nachrichten an es senden. Die Mailbox-Kanäle werden unter Verwendung der STM implementiert, so dass Nachrichten asynchron gesammelt werden können.
Lassen Sie uns einen asynchronen Job erstellen, der das Postfach füttert;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
Die a >-> toOutput output
nur normale Rohr Zusammensetzung ist, müssen wir toOutput
Ausgang in ein Rohr konvertieren. Beachten Sie den performGC
Aufruf, der auch Teil des IO ist, ermöglicht es Pipes.Concurrent zu bereinigen, nachdem der Auftrag abgeschlossen ist. Wir könnten dies mit forkIO
ausführen, wenn wir möchten, aber in diesem Fall verwenden wir async
, so dass wir auf das Ergebnis warten können, um später zu beenden. Okay, also unser Postfach sollte asynchron sein Empfangen Nachrichten, lassen Sie uns sie herausziehen und etwas Arbeit zu tun.
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
Die gleiche Idee wie zuvor, aber dieses Mal laichen wir nur ein paar von ihnen. Wir lesen von der Eingabe wie eine normale Röhre unter Verwendung von fromInput
und führen sie dann durch den Rest unserer Kette und bereinigen, wenn wir fertig sind. input
stellt sicher, dass jedes Mal, wenn ein Wert abgerufen wird, nur ein Mitarbeiter sie erhält. Wenn alle Jobs, die in output
eingegeben werden, abgeschlossen sind (es verfolgt alle offenen Jobs), schließt es die input
Pipe und die Worker werden beendet.
Wenn Sie dies in einem Web-Worker-Szenario verwenden, haben Sie eine Hauptschleife, die Anfragen an den Kanal toOutput output
sendet und dann so viele Arbeiter spawnt, wie Sie möchten, die in ihre Pipeline von fromInput input
ziehen.
https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#buffer sieht so aus, als könnte es so angepasst werden, dass mehrere gegebene Pipes in die TQueue eingegeben werden, So bekommen wir '[Pipe ab] -> Pipe ab'. Ich habe eine [Feature-Anfrage] (https://github.com/jwiegley/pipes-async/issues/1) ... vor einer Stunde gepostet>: D – Gurkenglas