From f010327cd1b9fd3b260ef1623d252723f395fbc7 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Tue, 6 May 2025 03:45:51 -0400 Subject: Binary message bus --- fig-bus/fig-bus.cabal | 3 +- fig-bus/main/Main.hs | 23 +++++++++++---- fig-bus/src/Fig/Bus/Binary.hs | 55 ++++++------------------------------ fig-bus/src/Fig/Bus/Binary/Client.hs | 30 +++++++++----------- fig-bus/src/Fig/Bus/Binary/Utils.hs | 44 +++++++++++++++++++++++++++++ flake.nix | 53 ++++++++++++++++++++++++++++------ 6 files changed, 129 insertions(+), 79 deletions(-) create mode 100644 fig-bus/src/Fig/Bus/Binary/Utils.hs diff --git a/fig-bus/fig-bus.cabal b/fig-bus/fig-bus.cabal index d8caf99..bfb2c7f 100644 --- a/fig-bus/fig-bus.cabal +++ b/fig-bus/fig-bus.cabal @@ -33,8 +33,9 @@ library hs-source-dirs: src exposed-modules: Fig.Bus.SExp - Fig.Bus.Binary Fig.Bus.SExp.Client + Fig.Bus.Binary + Fig.Bus.Binary.Utils Fig.Bus.Binary.Client executable fig-bus diff --git a/fig-bus/main/Main.hs b/fig-bus/main/Main.hs index dcafffd..bf5c170 100644 --- a/fig-bus/main/Main.hs +++ b/fig-bus/main/Main.hs @@ -4,18 +4,28 @@ import Fig.Prelude import Options.Applicative -import qualified Fig.Bus -import qualified Fig.Bus.Binary +import qualified Fig.Bus.SExp as SExp +import qualified Fig.Bus.Binary as Binary + +data Command = SExp | Binary + +parseCommand :: Parser Command +parseCommand = subparser $ mconcat + [ command "sexp" $ info (pure SExp) (progDesc "Launch the s-expression bus") + , command "binary" $ info (pure Binary) (progDesc "Launch the binary bus") + ] data Opts = Opts - { host :: Text - , port :: Text + { host :: !Text + , port :: !Text + , cmd :: !Command } parseOpts :: Parser Opts parseOpts = Opts <$> strOption (long "host" <> metavar "HOST" <> help "Interface to bind" <> value "localhost") <*> strOption (long "port" <> metavar "PORT" <> help "Port to bind" <> showDefault <> value "32050") + <*> parseCommand main :: IO () main = do @@ -23,5 +33,6 @@ main = do ( fullDesc <> header "fig-bus - a pub/sub message bus" ) - -- Fig.Bus.main (Just opts.host, opts.port) - Fig.Bus.Binary.main (Just opts.host, opts.port) + case opts.cmd of + SExp -> SExp.main (Just opts.host, opts.port) + Binary -> Binary.main (Just opts.host, opts.port) diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs index 6329d01..5f465ec 100644 --- a/fig-bus/src/Fig/Bus/Binary.hs +++ b/fig-bus/src/Fig/Bus/Binary.hs @@ -5,19 +5,15 @@ import Fig.Prelude import Control.Monad (when) import Control.Concurrent.MVar as MVar -import Data.Word (Word8, Word32) -import Data.Bits ((.|.), (.&.), shiftL, shiftR) import qualified Data.List as List -import Data.ByteString (hPut, hGet) +import Data.ByteString (hGet) import qualified Data.ByteString as BS import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.IORef as IORef import Fig.Utils.Net - -newtype EventType = EventType ByteString - deriving (Show, Eq, Ord) +import Fig.Bus.Binary.Utils newtype BusState = BusState { subscriptions :: Map EventType [Handle] @@ -33,39 +29,6 @@ unsubscribe ev h bs = bs { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions } -intFromLEBytes :: [Word8] -> Int -intFromLEBytes [] = 0 -intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x - -readLengthPrefixed :: Handle -> IO (Maybe ByteString) -readLengthPrefixed h = do - n <- hGet h 4 - log $ "parsing: " <> tshow n - case intFromLEBytes (BS.unpack n) of - 0 -> pure Nothing - len -> do - log $ "reading: " <> tshow len - x <- hGet h len - pure $ Just x - -readEvent :: Handle -> IO (Maybe EventType) -readEvent h = do - mb <- readLengthPrefixed h - pure $ EventType <$> mb - -writeLengthPrefixed :: Handle -> ByteString -> IO () -writeLengthPrefixed h d = do - let l :: Word32 = fromIntegral $ BS.length d - let bytes = - [ fromIntegral $ l .&. 0xff - , fromIntegral $ shiftR l 8 .&. 0xff - , fromIntegral $ shiftR l 16 .&. 0xff - , fromIntegral $ shiftR l 24 .&. 0xff - ] - hPut h $ BS.pack bytes <> d - -writeEvent :: Handle -> EventType -> IO () -writeEvent h (EventType d) = writeLengthPrefixed h d publish :: EventType -> ByteString -> BusState -> IO () publish ev d bs = @@ -88,19 +51,19 @@ main bind = do when (BS.length c == 1) do case BS.head c of 115 -> readEvent h >>= \case - Just ev -> do - log $ tshow peer <> " subscribing to: " <> tshow ev + Just ev@(EventType e) -> do + log $ tshow peer <> " subscribing to: " <> tshow e IORef.modifyIORef' subs (ev:) MVar.modifyMVar_ st (pure . subscribe ev h) go - _ -> log "malformed subscription" + _else -> log "malformed subscription" 112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case - (Just ev, Just d) -> do - log $ tshow peer <> " publishing to: " <> tshow ev + (Just ev@(EventType e), Just d) -> do + log $ tshow peer <> " publishing to: " <> tshow e publish ev d =<< MVar.readMVar st go - _ -> log "malformed publish" - _ -> log "unknown" + _else -> log "malformed publish" + w -> log $ "unknown command code: " <> tshow w go , do ss <- IORef.readIORef subs diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs index a41f300..a2e601f 100644 --- a/fig-bus/src/Fig/Bus/Binary/Client.hs +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -1,5 +1,3 @@ -{-# Language QuasiQuotes #-} - module Fig.Bus.Binary.Client (Commands(..), busClient) where import Fig.Prelude @@ -9,14 +7,14 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async -import Data.ByteString (hPut, hGet) +import Data.ByteString (hPut) import Fig.Utils.Net -import Fig.Bus.Binary +import Fig.Bus.Binary.Utils data Commands m = Commands - { subscribe :: EventType -> m () - , publish :: EventType -> ByteString -> m () + { subscribe :: !(ByteString -> m ()) + , publish :: !(ByteString -> ByteString -> m ()) } newtype FigBusClientException = FigBusClientException Text @@ -27,16 +25,16 @@ busClient :: forall m. (MonadIO m, MonadThrow m, MonadMask m) => (Text, Text) -> (Commands IO -> IO ()) -> - (Commands IO -> SExpr -> IO ()) -> + (Commands IO -> ByteString -> ByteString -> IO ()) -> IO () -> m () busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> let cmds = Commands - { subscribe = \(EventType ev) -> do + { subscribe = \ev -> do hPut h "s" writeLengthPrefixed h ev - , publish = \(EventType ev) d -> do + , publish = \ev d -> do hPut h "p" writeLengthPrefixed h ev writeLengthPrefixed h d @@ -45,11 +43,9 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu ( do liftIO $ Async.concurrently_ (onConn cmds) do forever do - - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x + (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> liftIO $ onData cmds ev d + _else -> throwM . FigBusClientException $ "Server sent malformed data" , liftIO onQuit ) where @@ -60,10 +56,10 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu _testClient :: IO () _testClient = busClient ("localhost", "32050") (\cmds -> do - cmds.subscribe [sexp|foo|] + cmds.subscribe "foo" forever do Conc.threadDelay 1000000 - cmds.publish [sexp|bar|] [[sexp|42|]] + cmds.publish "bar" "42" ) - (\_cmds d -> log $ "Received: " <> pretty d) + (\_cmds ev d -> log $ "Received: " <> tshow (ev, d)) (pure ()) diff --git a/fig-bus/src/Fig/Bus/Binary/Utils.hs b/fig-bus/src/Fig/Bus/Binary/Utils.hs new file mode 100644 index 0000000..734c7c9 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary/Utils.hs @@ -0,0 +1,44 @@ +module Fig.Bus.Binary.Utils where + +import Fig.Prelude + +import Data.Word (Word8, Word32) +import Data.Bits ((.|.), (.&.), shiftL, shiftR) +import Data.ByteString (hPut, hGet) +import qualified Data.ByteString as BS + +newtype EventType = EventType ByteString + deriving (Show, Eq, Ord) + +intFromLEBytes :: [Word8] -> Int +intFromLEBytes [] = 0 +intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x + +readLengthPrefixed :: Handle -> IO (Maybe ByteString) +readLengthPrefixed h = do + n <- hGet h 4 + case intFromLEBytes (BS.unpack n) of + 0 -> pure Nothing + len -> do + x <- hGet h len + pure $ Just x + + +readEvent :: Handle -> IO (Maybe EventType) +readEvent h = do + mb <- readLengthPrefixed h + pure $ EventType <$> mb + +writeLengthPrefixed :: Handle -> ByteString -> IO () +writeLengthPrefixed h d = do + let l :: Word32 = fromIntegral $ BS.length d + let bytes = + [ fromIntegral $ l .&. 0xff + , fromIntegral $ shiftR l 8 .&. 0xff + , fromIntegral $ shiftR l 16 .&. 0xff + , fromIntegral $ shiftR l 24 .&. 0xff + ] + hPut h $ BS.pack bytes <> d + +writeEvent :: Handle -> EventType -> IO () +writeEvent h (EventType d) = writeLengthPrefixed h d diff --git a/flake.nix b/flake.nix index ca71b8b..6d7dbfc 100644 --- a/flake.nix +++ b/flake.nix @@ -53,11 +53,11 @@ overrides = haskellOverrides; }; - figBusModule = { config, lib, ... }: + figBusSExpModule = { config, lib, ... }: let - cfg = config.colonq.services.fig-bus; + cfg = config.colonq.services.fig-bus-sexp; in { - options.colonq.services.fig-bus = { + options.colonq.services.fig-bus-sexp = { enable = lib.mkEnableOption "Enable the fig message bus"; host = lib.mkOption { type = lib.types.str; @@ -71,17 +71,51 @@ }; }; config = lib.mkIf cfg.enable { - systemd.services."colonq.fig-bus" = { + systemd.services."colonq.fig-bus-sexp" = { wantedBy = ["network-online.target"]; serviceConfig = { Restart = "on-failure"; - ExecStart = "${haskellPackages.fig-bus}/bin/fig-bus --host ${cfg.host} --port ${toString cfg.port}"; + ExecStart = "${haskellPackages.fig-bus}/bin/fig-bus sexp --host ${cfg.host} --port ${toString cfg.port}"; DynamicUser = "yes"; - RuntimeDirectory = "colonq.fig-bus"; + RuntimeDirectory = "colonq.fig-bus-sexp"; RuntimeDirectoryMode = "0755"; - StateDirectory = "colonq.fig-bus"; + StateDirectory = "colonq.fig-bus-sexp"; StateDirectoryMode = "0700"; - CacheDirectory = "colonq.fig-bus"; + CacheDirectory = "colonq.fig-bus-sexp"; + CacheDirectoryMode = "0750"; + }; + }; + }; + }; + figBusBinaryModule = { config, lib, ... }: + let + cfg = config.colonq.services.fig-bus-binary; + in { + options.colonq.services.fig-bus-binary = { + enable = lib.mkEnableOption "Enable the fig message bus"; + host = lib.mkOption { + type = lib.types.str; + default = "127.0.0.1"; + description = "The host bound by the fig server"; + }; + port = lib.mkOption { + type = lib.types.port; + default = 32051; + description = "The port bound by the fig server"; + }; + }; + config = lib.mkIf cfg.enable { + systemd.services."colonq.fig-bus-binary" = { + wantedBy = ["network-online.target"]; + serviceConfig = { + Restart = "on-failure"; + ExecStart = "${haskellPackages.fig-bus}/bin/fig-bus binary --host ${cfg.host} --port ${toString cfg.port}"; + DynamicUser = "yes"; + RuntimeDirectory = "colonq.fig-bus-binary"; + RuntimeDirectoryMode = "0755"; + StateDirectory = "colonq.fig-bus-binary"; + StateDirectoryMode = "0700"; + CacheDirectory = "colonq.fig-bus-binary"; CacheDirectoryMode = "0750"; }; }; @@ -401,7 +435,8 @@ program = "${haskellPackages.fig-bus}/bin/fig-bus"; }; nixosModules = { - figBus = figBusModule; + figBusSExp = figBusSExpModule; + figBusBinary = figBusBinaryModule; figMonitorTwitchLiveWatcher = figMonitorTwitchLiveWatcherModule; figMonitorDiscord = figMonitorDiscordModule; figMonitorIRC = figMonitorIRCModule; -- cgit v1.2.3