diff options
Diffstat (limited to 'fig-bus')
| -rw-r--r-- | fig-bus/fig-bus.cabal | 45 | ||||
| -rw-r--r-- | fig-bus/main/Main.hs | 25 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus.hs | 62 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/Client.hs | 65 |
4 files changed, 197 insertions, 0 deletions
diff --git a/fig-bus/fig-bus.cabal b/fig-bus/fig-bus.cabal new file mode 100644 index 0000000..ec68ef5 --- /dev/null +++ b/fig-bus/fig-bus.cabal @@ -0,0 +1,45 @@ +cabal-version: 3.4 +name: fig-bus +version: 0.1.0.0 + +common defaults + ghc-options: -Wall + default-language: GHC2021 + default-extensions: NoImplicitPrelude PackageImports LambdaCase MultiWayIf OverloadedStrings OverloadedLists OverloadedRecordDot DuplicateRecordFields RecordWildCards NoFieldSelectors BlockArguments ViewPatterns TypeFamilies DataKinds GADTs + +common deps + build-depends: + base + , binary + , bytestring + , containers + , directory + , containers + , directory + , filepath + , megaparsec + , mtl + , network + , safe-exceptions + , text + , time + , transformers + , unordered-containers + , vector + , fig-utils + +library + import: defaults + import: deps + hs-source-dirs: src + exposed-modules: + Fig.Bus + Fig.Bus.Client + +executable fig-bus + import: defaults + import: deps + build-depends: fig-bus, optparse-applicative + hs-source-dirs: + main + main-is: Main.hs
\ No newline at end of file diff --git a/fig-bus/main/Main.hs b/fig-bus/main/Main.hs new file mode 100644 index 0000000..9f84fd2 --- /dev/null +++ b/fig-bus/main/Main.hs @@ -0,0 +1,25 @@ +module Main where + +import Fig.Prelude + +import Options.Applicative + +import qualified Fig.Bus + +data Opts = Opts + { host :: Text + , port :: Text + } + +parseOpts :: Parser Opts +parseOpts = Opts + <$> strOption (long "host" <> metavar "HOST" <> help "Interface to bind" <> value "localhost") + <*> strOption (long "port" <> metavar "PORT" <> help "Port to bind" <> showDefault <> value "32050") + +main :: IO () +main = do + opts <- execParser $ info (parseOpts <**> helper) + ( fullDesc + <> header "fig-bus - a pub/sub message bus" + ) + Fig.Bus.main (Just opts.host, opts.port) diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus.hs new file mode 100644 index 0000000..2102864 --- /dev/null +++ b/fig-bus/src/Fig/Bus.hs @@ -0,0 +1,62 @@ +module Fig.Bus (main) where + +import Fig.Prelude + +import Control.Concurrent.MVar as MVar + +import qualified Data.List as List +import Data.ByteString (hPut, hGetLine) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.SExpr +import Fig.Utils.Net + +newtype BusState = BusState + { subscriptions :: Map SExpr [Handle] + } + +subscribe :: SExpr -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: SExpr -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +publish :: SExpr -> [SExpr] -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [SExpr]) + pure \h peer -> + ( do + forever do + line <- throwLeft id . decodeUtf8' =<< hGetLine h + case parseSExpr line of + Just (SExprList (SExprSymbol "ping":_)) -> do + log $ tshow peer <> " pinged" + hPut h . encodeUtf8 $ "(pong)\n" + Just (SExprList [SExprSymbol "sub", ev]) -> do + log $ tshow peer <> " subscribing to: " <> pretty ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + Just (SExprList (SExprSymbol "pub":ev:d)) -> do + log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev + publish ev d =<< MVar.readMVar st + Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x + Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/Client.hs new file mode 100644 index 0000000..6d72ad4 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Client.hs @@ -0,0 +1,65 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc + +import Data.ByteString (hPut, hGetLine) + +import Fig.Utils.Net +import Fig.Utils.SExpr + +data Commands m = Commands + { ping :: m () + , subscribe :: SExpr -> m () + , publish :: SExpr -> [SExpr] -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO . void . Conc.forkIO $ onConn cmds + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO $ exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> putStrLn $ "Received: " <> pretty d) + (pure ()) |
