Ich habe mit einem Designproblem in Haskell zu kämpfen, das ich nicht elegant und befriedigend lösen kann. Ich habe ein System, das im Kern auf dem Konzept Event Sourcing basiert: der Zustand des Systems ergibt sich aus der Anwendung einer Sequenz von Ereignissen in einen Ausgangszustand. Es gibt verschiedene Arten von Veranstaltungen, jede Art auf eine bestimmte Komponente des Systems durch eine Art Familie verwendet werden:Wie schreibe ich einen Event-Bus in Haskell?
class Model a where
data Event a :: *
apply :: Event a -> a -> a
instance Model Foo where
data Event Foo = Foo Int
...
instance Model Bar where
data Event Bar = Bar String
...
Derzeit ist das System 100% synchron und gekoppelt, wobei jedes Modell des Zugriff der Ereignisse an alle anderen Modellen und dies wird schnell ein Chaos, so will ich die Dinge durch die Einführung eines EventbusBus Events
so entkoppeln soll ich in der Lage sein, so etwas wie zu schreiben dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events
einige Verbraucher von Event Foo
zu einem Bus Events
vorausgesetzt, es gibt einige zu befestigen ist Form der Subtypisierung oder Subsumtion zwischen Event Foo
und Events
. Dann kann ich Asynchronität hinzufügen, indem Sie sicherstellen, dass jeder Benutzer in seinen eigenen Threads ausgeführt wird.
Aus Systemsicht würde dies mir erlauben, sicherzustellen, dass jede Komponente unabhängig packagable ist, Abhängigkeiten zu einer Teilmenge aller Ereignisse beschränkend. Events
Typ würde auf der gesamten Anwendungsebene definiert werden. Dieses Problem sieht täuschend ähnlich zeitdiskreten FRP, aber ich kann nicht in der Lage sein, meinen Kopf um ihn herum zu wickeln ...
Hat jemand schon etwas Ähnliches behandelt und wenn ja, wie?
EDIT:
Ich kam mit dem folgenden Code auf, die keinen Gebrauch von Source
macht aber durch @ stark inspiriert Vorschlag der Cirdec:
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Reader
import qualified Data.Vector as V
type Handlers e = V.Vector (Handler e)
data EventBus e = EventBus { handlers :: Handlers e
, eventQueue :: TChan e
, eventThread :: MVar ThreadId
}
newBus :: IO (EventBus e)
newBus = do
chan <- newTChanIO
var <- newEmptyMVar
return $ EventBus V.empty chan var
addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h [email protected]{..} = b { handlers = V.snoc handlers h }
removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx [email protected]{..} = b { handlers = let (h,t) = V.splitAt idx handlers
in h V.++ V.tail t }
startBus :: EventBus e -> IO (EventBus e)
startBus [email protected]{..} = do
tid <- forkIO (runBus b)
putMVar eventThread tid
return b
runBus :: EventBus e -> IO()
runBus [email protected]{..} = do
_ <- takeMVar eventThread
forever $ do
e <- liftIO $ atomically $ readTChan eventQueue
v <- newTVarIO b
runReaderT (runEvents $ publish e) v
-- | A monad to handle pub/sub of events of type @[email protected]
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))
newtype Handler e = Handler { handle :: Events e() -- Unsubscription function
-> Events e (e -> Events e()) -- what to do with events @[email protected]
}
-- | Register a new @Handler [email protected] within given @Events [email protected] context
subscribe :: Handler e -> Events e()
subscribe h = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (addHandler h)
unsubscribe :: Int -> Events e()
unsubscribe idx = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (removeHandler idx)
publishBus :: EventBus e -> e -> IO()
publishBus EventBus{..} = atomically . writeTChan eventQueue
publish :: e -> Events e()
publish event = do
EventBus{..} <- ask >>= liftIO . atomically . readTVar
forM_ (zip (V.toList handlers) [0..]) (dispatch event)
dispatch :: e -> (Handler e, Int) -> Events e()
dispatch event (Handler h, idx) = do
hdl <- h (unsubscribe idx)
hdl event
printer :: (Show s) => String -> Handler s
printer prefix = Handler (\ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))
so suchen Sie so etwas wie das beobachtbare Muster für Ihre Veranstaltungen oder? IMO kannst du es genauso implementieren wie du es in OO machen würdest (mit 'IORef' oder was auch immer, wenn du' Subscribe' ähnliches Verhalten willst - oder den Handlern bei der Erstellung gibst wenn du nicht willst) - die Frage ist: Wirklich Brauchen Sie das oder sollten Ihre * Befehle/-handler * dafür verantwortlich sein? – Carsten
@ Carsten Art von ja.Für die eigentliche Installation plane ich, 'TChan' zu verwenden, die die nette Eigenschaft haben, Pub/Subfähigkeiten zur Verfügung zu stellen, aber das ist nicht wirklich mein Problem. Ich mache mir mehr Gedanken über den allgemeinen Design-Ansatz, das sieht so aus, als würde ich versuchen, so etwas wie typisierte Schauspieler zu implementieren oder zeitdiskreten FRP neu zu erfinden, aber ich bin verwirrt. – insitu
Was ist das Problem, das Sie versuchen zu lösen? Ihre Frage lautet, wie Sie ein Problem auf eine bestimmte Art und Weise lösen können und nicht wie Sie das Problem lösen können. Es ist durchaus möglich, dass die Lösung bereits existiert: oo Style-Events, Observables, FRP oder Concurrent Pipes. – Cirdec