2015-09-07 6 views
8

Ich habe mit einem Designproblem in Haskell zu kämpfen, das ich nicht elegant und befriedigend lösen kann. Ich habe ein System, das im Kern auf dem Konzept Event Sourcing basiert: der Zustand des Systems ergibt sich aus der Anwendung einer Sequenz von Ereignissen in einen Ausgangszustand. Es gibt verschiedene Arten von Veranstaltungen, jede Art auf eine bestimmte Komponente des Systems durch eine Art Familie verwendet werden:Wie schreibe ich einen Event-Bus in Haskell?

class Model a where 
    data Event a :: * 
    apply :: Event a -> a -> a 

instance Model Foo where 
    data Event Foo = Foo Int 
    ... 

instance Model Bar where 
    data Event Bar = Bar String 
    ... 

Derzeit ist das System 100% synchron und gekoppelt, wobei jedes Modell des Zugriff der Ereignisse an alle anderen Modellen und dies wird schnell ein Chaos, so will ich die Dinge durch die Einführung eines EventbusBus Events so entkoppeln soll ich in der Lage sein, so etwas wie zu schreiben dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events einige Verbraucher von Event Foo zu einem Bus Events vorausgesetzt, es gibt einige zu befestigen ist Form der Subtypisierung oder Subsumtion zwischen Event Foo und Events. Dann kann ich Asynchronität hinzufügen, indem Sie sicherstellen, dass jeder Benutzer in seinen eigenen Threads ausgeführt wird.

Aus Systemsicht würde dies mir erlauben, sicherzustellen, dass jede Komponente unabhängig packagable ist, Abhängigkeiten zu einer Teilmenge aller Ereignisse beschränkend. Events Typ würde auf der gesamten Anwendungsebene definiert werden. Dieses Problem sieht täuschend ähnlich zeitdiskreten FRP, aber ich kann nicht in der Lage sein, meinen Kopf um ihn herum zu wickeln ...

Hat jemand schon etwas Ähnliches behandelt und wenn ja, wie?

EDIT:

Ich kam mit dem folgenden Code auf, die keinen Gebrauch von Source macht aber durch @ stark inspiriert Vorschlag der Cirdec:

import   Control.Applicative 
import   Control.Concurrent 
import   Control.Concurrent.STM 
import   Control.Monad.Reader 
import qualified Data.Vector   as V 

type Handlers e = V.Vector (Handler e) 

data EventBus e = EventBus { handlers :: Handlers e 
          , eventQueue :: TChan e 
          , eventThread :: MVar ThreadId 
          } 

newBus :: IO (EventBus e) 
newBus = do 
    chan <- newTChanIO 
    var <- newEmptyMVar 
    return $ EventBus V.empty chan var 

addHandler :: Handler e -> EventBus e -> EventBus e 
addHandler h [email protected]{..} = b { handlers = V.snoc handlers h } 

removeHandler :: Int -> EventBus e -> EventBus e 
removeHandler idx [email protected]{..} = b { handlers = let (h,t) = V.splitAt idx handlers 
                in h V.++ V.tail t } 

startBus :: EventBus e -> IO (EventBus e) 
startBus [email protected]{..} = do 
    tid <- forkIO (runBus b) 
    putMVar eventThread tid 
    return b 

runBus :: EventBus e -> IO() 
runBus [email protected]{..} = do 
    _ <- takeMVar eventThread 
    forever $ do 
    e <- liftIO $ atomically $ readTChan eventQueue 
    v <- newTVarIO b 
    runReaderT (runEvents $ publish e) v 

-- | A monad to handle pub/sub of events of type @[email protected] 
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a } 
        deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e))) 

newtype Handler e = Handler { handle :: Events e()     -- Unsubscription function 
            -> Events e (e -> Events e()) -- what to do with events @[email protected] 
          } 


-- | Register a new @Handler [email protected] within given @Events [email protected] context 
subscribe :: Handler e -> Events e() 
subscribe h = do 
    bus <- ask 
    liftIO $ atomically $ modifyTVar' bus (addHandler h) 

unsubscribe :: Int -> Events e() 
unsubscribe idx = do 
    bus <- ask 
    liftIO $ atomically $ modifyTVar' bus (removeHandler idx) 

publishBus :: EventBus e -> e -> IO() 
publishBus EventBus{..} = atomically . writeTChan eventQueue 

publish :: e -> Events e() 
publish event = do 
    EventBus{..} <- ask >>= liftIO . atomically . readTVar 
    forM_ (zip (V.toList handlers) [0..]) (dispatch event) 

dispatch :: e -> (Handler e, Int) -> Events e() 
dispatch event (Handler h, idx) = do 
    hdl <- h (unsubscribe idx) 
    hdl event 

printer :: (Show s) => String -> Handler s 
printer prefix = Handler (\ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e)) 
+0

so suchen Sie so etwas wie das beobachtbare Muster für Ihre Veranstaltungen oder? IMO kannst du es genauso implementieren wie du es in OO machen würdest (mit 'IORef' oder was auch immer, wenn du' Subscribe' ähnliches Verhalten willst - oder den Handlern bei der Erstellung gibst wenn du nicht willst) - die Frage ist: Wirklich Brauchen Sie das oder sollten Ihre * Befehle/-handler * dafür verantwortlich sein? – Carsten

+0

@ Carsten Art von ja.Für die eigentliche Installation plane ich, 'TChan' zu verwenden, die die nette Eigenschaft haben, Pub/Subfähigkeiten zur Verfügung zu stellen, aber das ist nicht wirklich mein Problem. Ich mache mir mehr Gedanken über den allgemeinen Design-Ansatz, das sieht so aus, als würde ich versuchen, so etwas wie typisierte Schauspieler zu implementieren oder zeitdiskreten FRP neu zu erfinden, aber ich bin verwirrt. – insitu

+0

Was ist das Problem, das Sie versuchen zu lösen? Ihre Frage lautet, wie Sie ein Problem auf eine bestimmte Art und Weise lösen können und nicht wie Sie das Problem lösen können. Es ist durchaus möglich, dass die Lösung bereits existiert: oo Style-Events, Observables, FRP oder Concurrent Pipes. – Cirdec

Antwort

3

Eine Quelle der Ereignisse a s trägt, die kann abonniert werden, hat folgenden Typ

type Source m a = (a -> m()) -> m (m()) 
        |    | ^--- how to unsubscribe    
        |    ^--- how to subscribe 
        ^--- what to do when an `a` happens 

Ein Verbraucher oder Hand ler von Ereignissen ist naiv etwas, das eine Ereignisquelle und abonniert hat es

type Handler m a = (Source m a   ) -> m() 
       = ((a -> m()) -> m (m())) -> m() 
               ^-- set up the consumer. 

Dies ist ein wenig verworren Dinge umkehren, wir können und eine schönere Darstellung für einen Event-Handler akzeptiert:

type Handler m a = m() -> m (a -> m()) 
        |  | ^-- what to do when an `a` happens 
        |  ^-- set up the consumer 
        ^-- how to unsubscribe 

Die ursprüngliche Ereignisquelle war ein bisschen schwierig zu verwenden; Ein Abonnent möchte sich möglicherweise als Reaktion auf ein Ereignis abmelden. In diesem Fall müssen die Benutzer die resultierende Abmeldungsaktion rekursiv aufrufen, um festzulegen, was bei einem Ereignis zu tun ist. Beginnend mit der schöneren Definition eines Handler haben wir dieses Problem nicht. Eine Ereignisquelle akzeptiert jetzt einen Ereignishandler und veröffentlicht ihn.

type Source m a = (Handler m a   ) -> m() 
       = (m() -> m (a -> m())) -> m() 
              ^-- how to subscribe 
+1

Danke für den Vorschlag, das sieht nach dem aus, was ich suche, außer einem fehlenden Teil: Wie definiere ich einen 'Handler ma' so, dass er an ein' Source mb' übergeben werden kann, wo 'a: insitu

+0

Eigentlich scheint das nicht so einfach zu sein, weil es bedeuten würde, einige' a in einen Summentyp "b", dann an den relevanten "Handler c" senden, so dass "a: insitu

Verwandte Themen