2013-03-24 9 views
8

Ich versuche, einen Conduit zu erstellen, der mehrere Eingabeströme konsumieren kann. Ich muss in der Lage sein, auf den einen oder den anderen der Eingabeströme in keiner bestimmten Reihenfolge (z. B. nicht alternierend) zu warten, was zip unbrauchbar macht. Hier läuft nichts parallel oder nicht-deterministisch ab: Ich warte auf den einen oder den anderen Strom. Ich möchte in der Lage sein, den Code zu schreiben, wie die folgenden (wobei awaitA und awaitB auf dem ersten oder zweiten Eingangsstrom erwarten bzw.):Fusing-Conduits mit mehreren Eingängen

do 
    _ <- awaitA 
    x <- awaitA 
    y <- awaitB 
    yield (x,y) 
    _ <- awaitB 
    _ <- awaitB 
    y' <- awaitB 
    yield (x,y') 

Die beste Lösung, die ich habe, ist den inneren monadisch einer andere Leitung zu machen, z.B.

foo :: Sink i1 (ConduitM i2 o m)() 

die dann

awaitA = await 
awaitB = lift await 

erlaubt Und das funktioniert meistens. Unglücklicherweise scheint es sehr schwierig zu sein, mit der inneren Leitung zu verschmelzen, bevor die äußere Leitung vollständig verbunden ist. Das erste, was ich versuchte, war:

fuseInner :: Monad m => 
       Conduit i2' m i2 -> 
       Sink i1 (ConduitM i2 o m)() -> 
       Sink i1 (ConduitM i2' o m)() 
fuseInner x = transPipe (x =$=) 

Aber das funktioniert nicht, zumindest wenn x Stateful ist seit (x =$=) mehrere Male ausgeführt wird, effektiv x jedes Mal neu zu starten.

Gibt es eine Möglichkeit, fireInner zu schreiben, kurz vor dem Einbruch in das Innere von Conduit (was aussieht, als wäre es ziemlich chaotisch)? Gibt es eine bessere Möglichkeit, mehrere Eingabeströme zu verarbeiten? Bin ich weit darüber hinaus, wofür der Conduit gedacht war?

Danke!

+3

Ich nehme an, dass Sie meinen, dass Sie Elemente erhalten möchten, wie sie aus den zwei 'IO' Eingangsströmen erzeugt werden. Verwenden Sie dazu 'stm-conduit'. –

+0

Ich habe Ihre aktualisierte Frage gelesen. Ist [dies] (http://stackoverflow.com/questions/12496654/is-there-an-iteratee-like-concept-which-pulls-data-from-multiple-sources/12497593#12497593) näher an dem, was Sie hatten im Hinterkopf? Wenn ja, kann ich es in die äquivalente "Conduit" -Version ändern und sie als Antwort einreichen. –

+0

Ich denke, dieser Link beschreibt genau das, was ich probiert habe ('foo' ist ein' Sink' über einer 'ConduitM' Monade). Das Problem ist, ich kann mit dieser Strategie nicht herausfinden, wie man mit dem inneren Kanal verschmelzen kann. – Benson

Antwort

3

Wenn Sie zwei IO -generierte Streams kombinieren möchten, dann ist Gabriels Kommentar die Lösung.

Ansonsten kann nicht auf beide Streams gewartet werden, welcher zuerst einen Wert erzeugt. Conduits sind single-threaded und deterministisch - es verarbeitet jeweils nur eine Pipe. Aber Sie könnten eine Funktion erstellen, die zwei Ströme verschachtelt, lassen sie entscheiden, wann wechseln:

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-} 
import Control.Monad (liftM) 
import Data.Conduit.Internal (
    Pipe (..), Source, Sink, 
    injectLeftovers, ConduitM (..), 
    mapOutput, mapOutputMaybe 
) 

-- | Alternate two given sources, running one until it yields `Nothing`, 
-- then switching to the other one. 
merge :: Monad m 
     => Source m (Maybe a) 
     -> Source m (Maybe b) 
     -> Source m (Either a b) 
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r 
    where 
    goL :: Monad m => Pipe()() (Maybe a)() m() 
        -> Pipe()() (Maybe b)() m() 
        -> Pipe()() (Either a b)() m() 
    goL (Leftover l()) r   = goL l r 
    goL (NeedInput _ c) r   = goL (c()) r 
    goL (PipeM mx) r    = PipeM $ liftM (`goL` r) mx 
    goL (Done _) r     = mapOutputMaybe (liftM Right) r 
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o) 
    goL (HaveOutput c f Nothing) r = goR c r 
    -- This is just a mirror copy of goL. We should combine them together to 
    -- avoid code repetition. 
    goR :: Monad m => Pipe()() (Maybe a)() m() 
        -> Pipe()() (Maybe b)() m() 
        -> Pipe()() (Either a b)() m() 
    goR l (Leftover r())   = goR l r 
    goR l (NeedInput _ c)   = goR l (c()) 
    goR l (PipeM mx)    = PipeM $ liftM (goR l) mx 
    goR l (Done _)     = mapOutputMaybe (liftM Left) l 
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o) 
    goR l (HaveOutput c f Nothing) = goL l c 

Er verarbeitet eine Quelle, bis es Nothing zurückkehrt, schaltet dann auf eine andere, etc. Wenn eine Quelle beendet, der andere ist bis zum Ende verarbeitet.

Als Beispiel können wir zwei Listen kombinieren und verschachteln:

import Control.Monad.Trans 
import Data.Conduit (($$), awaitForever) 
import Data.Conduit.List (sourceList) 

main = (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [ 1..10]) 
       (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110])) 
     $$ awaitForever (\x -> lift $ print x) 

Wenn mehrere Quellen benötigen, könnte merge wie

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a 

etwas angepasst werden würden, die durchlaufen die angegebene Quellenliste, bis alle fertig sind.

+0

Ich denke, sowohl Ihre Lösung als auch Gabriels Kommentar geht davon aus, dass die Upstream-Streams entscheiden sollen, was als nächstes passiert, und das ist nicht mein Ziel. Ich habe meine Frage aktualisiert, um zu klären. – Benson

+0

@Benson Ich bin mir nicht sicher, ob dies möglich ist, weil _contuit_ 'erwarten' (oder genauer gesagt, 'NeedInput') keine Informationen weitergibt, die verwendet werden könnten, um zu entscheiden, welcher Stream gelesen werden soll. Die Reihenfolge der Werte, die von stromaufwärts kommen, kann nicht durch eine Leitung beeinflusst werden. Dies scheint jedoch mit [pools] (http://hackage.haskell.org/package/pipes) möglich zu sein.Sie sind bidirektional und [request'] (http://hackage.haskell.org/packages/archive/pipes/3.2.0/doc/html/Control-Proxy-Class.html#v:request) erlaubt Informationen zu sein Send Upstream, mit dem man einen von zwei Streams auswählen kann. –

3

Diese kann getan werden durch Tauchen in die Einbauten der Leitung. Ich wollte das vermeiden, weil es extrem unordentlich aussah. Basierend auf den Antworten hier klingt es wie es ist kein Weg um es (aber ich würde wirklich eine sauberere Lösung zu schätzen wissen).

Die Hauptschwierigkeit besteht darin, dass (x =$=) eine reine Funktion, aber transPipe gibt die richtige Antwort zu machen, braucht es eine Art von Stateful, funktionsähnliche Dinge:

data StatefulMorph m n = StatefulMorph 
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a) 
    , finalizeStatefulMorph :: n() } 

StatefulMorph m n Stepping nimmt einen Wert in m und gibt in n sowohl diesen Wert als auch den nächsten StatefulMorph zurück, der verwendet werden sollte, um den nächsten m Wert zu transformieren. Die letzte StatefulMorph sollte (was im Falle des „Stateful (x =$=)“ abgeschlossen werden, schließt die x Leitung

Conduit Fusion als StatefulMorph umgesetzt werden können, um den Code für pipeL mit geringfügigen Änderungen unter Verwendung der Signatur ist..:

fuseStateful :: Monad m 
      => Conduit a m b 
      -> StatefulMorph (ConduitM b c m) (ConduitM a c m) 

ich auch einen Ersatz für transPipe (ein Spezialfall von hoist) müssen die StatefulMorph Werte anstelle von Funktionen verwendet.

class StatefulHoist t where 
    statefulHoist :: (Monad m, Monad n) 
        => StatefulMorph m n 
        -> t m r -> t n r 

Eine StatefulHoist Instanz für ConduitM i o kann mit dem Code für transPipe mit einigen geringfügigen Änderungen geschrieben werden.

fuseInner ist dann einfach zu implementieren.

fuseInner :: Monad m 
      => Conduit a m b 
      -> ConduitM i o (ConduitM b c m) r 
      -> ConduitM i o (ConduitM a c m) r 
fuseInner left = statefulHoist (fuseStateful left) 

Ich habe eine ausführlichere Erklärung here geschrieben und veröffentlicht den vollständigen Code here. Wenn jemand eine sauberere Lösung oder eine, die die Conduit public API verwendet, erstellen Sie diese bitte.

Danke für alle Vorschläge und Eingaben!

Verwandte Themen