2013-08-27 10 views
14

Angenommen, ich habe ein einfaches Erzeuger-/Verbrauchermodell, bei dem der Verbraucher einen bestimmten Status an den Hersteller zurückgeben möchte. Zum Beispiel können die stromabwärts fließenden Objekte Objekte sein, die wir in eine Datei schreiben wollen, und die vorgelagerten Objekte ein Token sein, das darstellt, wo das Objekt in die Datei geschrieben wurde (z. B. ein Offset).Idiomatische bidirektionale Pipes mit Downstream-Status ohne Verlust

Diese beiden Prozesse könnte wie folgt aussehen (mit pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-} 

import Pipes 
import Pipes.Core 
import Control.Monad.Trans.State  
import Control.Monad 

newtype Object = Obj Int 
       deriving (Show) 

newtype ObjectId = ObjId Int 
       deriving (Show, Num) 

writeObjects :: Proxy ObjectId Object() X IO r 
writeObjects = evalStateT (forever go) (ObjId 0) 
    where go = do i <- get 
       obj <- lift $ request i 
       lift $ lift $ putStrLn $ "Wrote "++show obj 
       modify (+1) 

produceObjects :: [Object] -> Proxy X() ObjectId Object IO() 
produceObjects = go 
    where go [] = return() 
     go (obj:rest) = do 
      lift $ putStrLn $ "Producing "++show obj 
      objId <- respond obj 
      lift $ putStrLn $ "Object "++show obj++" has ID "++show objId 
      go rest 

objects = [ Obj i | i <- [0..10] ] 

So einfach das auch sein mag, ich habe ein gutes Stück von Schwierigkeiten hatte Argumentation darüber, wie sie zu komponieren. Im Idealfall würde man eine Push-basierte wie die folgende Ablaufsteuerung wünschen,

  1. writeObjects beginnt, indem auf request Blockierung stromaufwärts der anfänglichen ObjId 0 gesendet hat.
  2. produceObjects das erste Objekt sendet, Obj 0 stromab
  3. writeObjects das Objekt schreibt und seinen Zustand inkrementiert, und wartet auf request, diesmal stromauf ObjId 1
  4. respond in produceObjects kehrt mit ObjId 0
  5. produceObjects Senden geht bei Schritt (2) mit dem zweiten Objekt, Obj 1

Mein erster Versuch war mit Push-basierten Zusammensetzung wie folgt

main = void $ run $ produceObjects objects >>~ const writeObjects 

Beachten Sie die Verwendung von const um die ansonsten inkompatible Typen arbeiten (dies wahrscheinlich ist, wo das Problem liegt). In diesem Fall jedoch finden wir, dass ObjId 0 gegessen wird,

Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 1 
Producing Obj 1 
... 

Ein Pull-basierten Ansatz,

main = void $ run $ const (produceObjects objects) +>> writeObjects 

leidet ein ähnliches Problem, diesmal fallen Obj 0.

Wie könnte man diese Stücke in der gewünschten Weise komponieren?

Antwort

14

Die Auswahl der zu verwendenden Zusammensetzung hängt davon ab, welche Komponente den gesamten Prozess initiieren soll. Wenn Sie möchten, dass die Downstream-Rohrleitung den Prozess einleitet, dann möchten Sie eine Pull-basierte Zusammensetzung verwenden (dh (>+>)/(+>>)). Wenn Sie jedoch möchten, dass die Upstream-Rohrleitung den Prozess einleitet, sollten Sie eine Push-basierte Zusammensetzung verwenden (z. B. (>>~)/(>~>)). . Die Typfehler, die Sie bekommen haben, haben Sie eigentlich gewarnt, dass es einen logischen Fehler in Ihrem Code gibt: Sie haben nicht eindeutig festgestellt, welche Komponente den Prozess zuerst initiiert.

Aus Ihrer Beschreibung geht hervor, dass Sie möchten, dass der Steuerungsfluss von produceObjects beginnt, damit Sie die Push-basierte Komposition verwenden können. Sobald Sie die Komposition auf Push-Basis verwenden, wird Ihnen der Typ des Kompositionsoperators alles sagen, was Sie darüber wissen müssen, wie Sie Ihren Code reparieren können.Ich werde seine Art nehmen und es auf Ihre Zusammensetzung Kette spezialisiert:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types 
(>>~) :: Server ObjectId Object IO() 
     -> (Object -> Client ObjectId Object IO()) 
     -> Effect IO() 

Wie Sie vielleicht schon bemerkt haben, die Art Fehler Sie bekam, wenn man versucht, (>>~) sagte Ihnen zu verwenden, dass Sie ein Argument vom Typ fehlten Object zu Ihrer writeObjects Funktion. Dies erzwingt statisch, dass Sie keinen Code in writeObjects ausführen können, bevor Sie Ihre erste Object (durch das ursprüngliche Argument) erhalten.

Die Lösung ist Ihre writeObjects Funktion wie folgt zu umschreiben:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj0 = evalStateT (go obj0) (ObjId 0) 
    where go obj = do i <- get 
        lift $ lift $ putStrLn $ "Wrote "++ show obj 
        modify (+1) 
        obj' <- lift $ request i 
        go obj' 

Dies ergibt dann das richtige Verhalten:

>>> run $ produceObjects objects >>~ writeObjects 
Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 0 
Producing Obj 1 
Wrote Obj 1 
Object Obj 1 has ID ObjId 1 
Producing Obj 2 
Wrote Obj 2 
Object Obj 2 has ID ObjId 2 
Producing Obj 3 
Wrote Obj 3 
Object Obj 3 has ID ObjId 3 
Producing Obj 4 
Wrote Obj 4 
Object Obj 4 has ID ObjId 4 
Producing Obj 5 
Wrote Obj 5 
Object Obj 5 has ID ObjId 5 
Producing Obj 6 
Wrote Obj 6 
Object Obj 6 has ID ObjId 6 
Producing Obj 7 
Wrote Obj 7 
Object Obj 7 has ID ObjId 7 
Producing Obj 8 
Wrote Obj 8 
Object Obj 8 has ID ObjId 8 
Producing Obj 9 
Wrote Obj 9 
Object Obj 9 has ID ObjId 9 
Producing Obj 10 
Wrote Obj 10 
Object Obj 10 has ID ObjId 10 

Sie fragen sich vielleicht, warum diese Anforderung, dass eines der beiden Rohre ein nimmt Anfangsargument macht Sinn, abgesehen von der abstrakten Begründung, dass dies die Kategoriegesetze erfordern. Die einfache englische Erklärung ist, dass die Alternative ist, dass Sie die erste übertragene Object "zwischen" die zwei Rohre Puffer benötigen würden, bevor writeObjects ihre erste request Erklärung erreichte. Dieser Ansatz erzeugt eine Menge problematisches Verhalten und fehlerhafte Eckfälle, aber das wahrscheinlich wichtigste Problem ist, dass die Rohrkomposition nicht mehr assoziativ ist und sich die Reihenfolge der Effekte basierend auf der Reihenfolge ändert, in der Sie die Dinge erstellt haben.

Das Schöne an den bidirektionalen Pipe-Composition-Operatoren ist, dass die Typen so funktionieren, dass man immer schlussfolgern kann, ob eine Komponente "aktiv" ist (dh Kontrolle auslöst) oder "passiv" ist (dh auf Input wartet) indem ich den Typ studiere. Wenn die Komposition sagt, dass ein bestimmtes Rohr (wie writeObjects) ein Argument nehmen muss, dann ist es passiv. Wenn es kein Argument benötigt (wie produceObjects), ist es aktiv und initiiert die Kontrolle. Die Komposition zwingt also, höchstens eine aktive Pipe in der Pipeline zu haben (die Pipe, die kein anfängliches Argument annimmt), und das ist die Pipe, die die Kontrolle beginnt.

4

Die 'Const' sind, wo Sie die Daten fallen lassen. Um alle Daten zu erhalten, möchten Sie wahrscheinlich einen Push-basierten Workflow tun wie folgt:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj = go 0 obj 
    where 
    go objid obj = do 
     lift $ putStrLn $ "Wrote "++show obj 
     obj' <- request objid 
     go (objid + 1) obj' 

-- produceObjects as before 

main = void $ run $ produceObjects objects >>~ writeObjects 
2

Wir haben dies auf der Mailing-Liste diskutiert, aber ich dachte, ich es oben werfen würde hier als gut für diejenigen, die interessiert sind.

Ihr Problem besteht darin, dass Sie zwei Coroutinen haben, die beide bereit sind, sich gegenseitig Werte auszuspucken. Keine benötigt die Eingabe der anderen, um einen Wert zu erzeugen. Wer kann zuerst gehen? Gut gesagt Sie es selbst:

writeObjects beginnt auf Wunsch blockiert, ist die erste gesendet hat ObjId 0 Upstream

Okay, das heißt, wir produceObjects so verzögern müssen, dass es für ein ObjId Signal wartet, bevor das entsprechende Objekt ausspucken (obwohl es anscheinend nicht genannte ID benötigt).

Eintauchen in Proxy-Interna, hier ist die magische Beschwörung, die ich nicht umständlich zu dieser Zeit erklären werde.Die Grundidee ist einfach Eingabe zu nehmen, bevor Sie es brauchen, dann die Eingabe gilt, wenn nötig, aber dann so tun, wie Sie einen neuen Eingang müssen (auch wenn Sie nicht dass eine gerade noch brauchen):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r 
delayD p0 b' = case p0 of 
    Request a' f -> Request a' (go . f) 
    Respond b g -> Respond b (delayD (g b')) 
    M m   -> M (liftM go m) 
    Pure r  -> Pure r 
    where 
    go p = delayD p b' 

Jetzt können Sie diese auf produceObjects objects anstelle von const, und Ihr zweiter Versuch funktioniert wie gewünscht:

delayD (produceObjects objects) +>> writeObjects 

Wir diskutieren delayD auf der Mailing-Liste, um zu sehen, ob es verdient die Aufnahme in den Standard-Pipes Repertoire.