2017-01-01 2 views
2

Ich habe einige Code Haskell, die Pipes verwendet:Wie kann ich eine Pipe gleichzeitig mit Haskells Pipe-Bibliothek erstellen?

module Main(main) where 
import Pipes 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = runEffect $ a >-> b >-> c 

Die Pipes.Concurrent tutorial zeigt mehrere Arbeiter zusammen mit Work Stealing verwenden. Wie kann ich etwas ähnliches innerhalb von b tun? Ich möchte b, um seine Arbeit gleichzeitig mit einer festgelegten Anzahl von Arbeitern durchzuführen.

Offensichtlich ist Nebenläufigkeit in diesem genauen Fall nicht nützlich, aber es ist das einfachste Beispiel, das ich mir vorstellen könnte. In meinem echten Anwendungsfall möchte ich einige Webanfragen gleichzeitig mit einer begrenzten Anzahl von Arbeitern machen.

+1

https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#buffer sieht so aus, als könnte es so angepasst werden, dass mehrere gegebene Pipes in die TQueue eingegeben werden, So bekommen wir '[Pipe ab] -> Pipe ab'. Ich habe eine [Feature-Anfrage] (https://github.com/jwiegley/pipes-async/issues/1) ... vor einer Stunde gepostet>: D – Gurkenglas

Antwort

2

EDIT: Ich missverstanden, was Sie gefragt haben; Sie können dies in einer Pipe tun, aber ich bin mir nicht sicher, was die Motivation wäre. Ich würde empfehlen, wiederverwendbare Rohrketten zu bauen und sie nur mit Hilfe von Arbeitern zu versenden, anstatt zu versuchen, Arbeiter innerhalb der Rohrleitung zu bauen. Sie verlieren alle Bestellgarantien, dass der erste Eingang der erste ist, wenn Sie ihn in die Leitung selbst einbauen.

Der Abschnitt auf Work Stealing ist, was Sie suchen, dieser Code ist im Grunde wörtlich aus dem Tutorial, aber lasst uns zusammenbrechen, wie es funktioniert. Hier ist ein Weg, könnten wir das tun, was Sie wollen:

module Main(main) where 
import Pipes 
import Pipes.Concurrent 

import Control.Concurrent.Async (async, wait) 
import Control.Concurrent (threadDelay) 
import Control.Monad (forM) 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = do 
    (output, input) <- spawn unbounded 
    feeder <- async $ do runEffect $ a >-> toOutput output 
         performGC 

    workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
       performGC 

    mapM_ wait (feeder:workers) 

Die erste Zeile spawn unbounded von Pipes.Concurrent ist es initialisiert eine ‚Mailbox‘, die einen Griff für Ein- und Ausgang hat. Es verwirrte mich zuerst, aber in diesem Fall senden wir Nachrichten an den Ausgang und ziehen sie von der Eingabe. Dies ähnelt einem Push-Pull-Nachrichtenkanal in Sprachen wie Golang.

Wir geben eine Buffer um zu sagen, wie viele Nachrichten wir speichern können, in diesem Fall setzen wir No-Limit mit unbegrenzt.

Okay, damit das Postfach initialisiert wird, können wir jetzt Effect s erstellen, die Nachrichten an es senden. Die Mailbox-Kanäle werden unter Verwendung der STM implementiert, so dass Nachrichten asynchron gesammelt werden können.

Lassen Sie uns einen asynchronen Job erstellen, der das Postfach füttert;

feeder <- async $ do runEffect $ a >-> toOutput output 
        performGC 

Die a >-> toOutput output nur normale Rohr Zusammensetzung ist, müssen wir toOutput Ausgang in ein Rohr konvertieren. Beachten Sie den performGC Aufruf, der auch Teil des IO ist, ermöglicht es Pipes.Concurrent zu bereinigen, nachdem der Auftrag abgeschlossen ist. Wir könnten dies mit forkIO ausführen, wenn wir möchten, aber in diesem Fall verwenden wir async, so dass wir auf das Ergebnis warten können, um später zu beenden. Okay, also unser Postfach sollte asynchron sein Empfangen Nachrichten, lassen Sie uns sie herausziehen und etwas Arbeit zu tun.

workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
      performGC 

Die gleiche Idee wie zuvor, aber dieses Mal laichen wir nur ein paar von ihnen. Wir lesen von der Eingabe wie eine normale Röhre unter Verwendung von fromInput und führen sie dann durch den Rest unserer Kette und bereinigen, wenn wir fertig sind. input stellt sicher, dass jedes Mal, wenn ein Wert abgerufen wird, nur ein Mitarbeiter sie erhält. Wenn alle Jobs, die in output eingegeben werden, abgeschlossen sind (es verfolgt alle offenen Jobs), schließt es die input Pipe und die Worker werden beendet.

Wenn Sie dies in einem Web-Worker-Szenario verwenden, haben Sie eine Hauptschleife, die Anfragen an den Kanal toOutput output sendet und dann so viele Arbeiter spawnt, wie Sie möchten, die in ihre Pipeline von fromInput input ziehen.

Verwandte Themen