From f010327cd1b9fd3b260ef1623d252723f395fbc7 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Tue, 6 May 2025 03:45:51 -0400 Subject: Binary message bus --- fig-bus/src/Fig/Bus/Binary.hs | 55 ++++++------------------------------ fig-bus/src/Fig/Bus/Binary/Client.hs | 30 +++++++++----------- fig-bus/src/Fig/Bus/Binary/Utils.hs | 44 +++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 63 deletions(-) create mode 100644 fig-bus/src/Fig/Bus/Binary/Utils.hs (limited to 'fig-bus/src/Fig/Bus') diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs index 6329d01..5f465ec 100644 --- a/fig-bus/src/Fig/Bus/Binary.hs +++ b/fig-bus/src/Fig/Bus/Binary.hs @@ -5,19 +5,15 @@ import Fig.Prelude import Control.Monad (when) import Control.Concurrent.MVar as MVar -import Data.Word (Word8, Word32) -import Data.Bits ((.|.), (.&.), shiftL, shiftR) import qualified Data.List as List -import Data.ByteString (hPut, hGet) +import Data.ByteString (hGet) import qualified Data.ByteString as BS import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.IORef as IORef import Fig.Utils.Net - -newtype EventType = EventType ByteString - deriving (Show, Eq, Ord) +import Fig.Bus.Binary.Utils newtype BusState = BusState { subscriptions :: Map EventType [Handle] @@ -33,39 +29,6 @@ unsubscribe ev h bs = bs { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions } -intFromLEBytes :: [Word8] -> Int -intFromLEBytes [] = 0 -intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x - -readLengthPrefixed :: Handle -> IO (Maybe ByteString) -readLengthPrefixed h = do - n <- hGet h 4 - log $ "parsing: " <> tshow n - case intFromLEBytes (BS.unpack n) of - 0 -> pure Nothing - len -> do - log $ "reading: " <> tshow len - x <- hGet h len - pure $ Just x - -readEvent :: Handle -> IO (Maybe EventType) -readEvent h = do - mb <- readLengthPrefixed h - pure $ EventType <$> mb - -writeLengthPrefixed :: Handle -> ByteString -> IO () -writeLengthPrefixed h d = do - let l :: Word32 = fromIntegral $ BS.length d - let bytes = - [ fromIntegral $ l .&. 0xff - , fromIntegral $ shiftR l 8 .&. 0xff - , fromIntegral $ shiftR l 16 .&. 0xff - , fromIntegral $ shiftR l 24 .&. 0xff - ] - hPut h $ BS.pack bytes <> d - -writeEvent :: Handle -> EventType -> IO () -writeEvent h (EventType d) = writeLengthPrefixed h d publish :: EventType -> ByteString -> BusState -> IO () publish ev d bs = @@ -88,19 +51,19 @@ main bind = do when (BS.length c == 1) do case BS.head c of 115 -> readEvent h >>= \case - Just ev -> do - log $ tshow peer <> " subscribing to: " <> tshow ev + Just ev@(EventType e) -> do + log $ tshow peer <> " subscribing to: " <> tshow e IORef.modifyIORef' subs (ev:) MVar.modifyMVar_ st (pure . subscribe ev h) go - _ -> log "malformed subscription" + _else -> log "malformed subscription" 112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case - (Just ev, Just d) -> do - log $ tshow peer <> " publishing to: " <> tshow ev + (Just ev@(EventType e), Just d) -> do + log $ tshow peer <> " publishing to: " <> tshow e publish ev d =<< MVar.readMVar st go - _ -> log "malformed publish" - _ -> log "unknown" + _else -> log "malformed publish" + w -> log $ "unknown command code: " <> tshow w go , do ss <- IORef.readIORef subs diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs index a41f300..a2e601f 100644 --- a/fig-bus/src/Fig/Bus/Binary/Client.hs +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -1,5 +1,3 @@ -{-# Language QuasiQuotes #-} - module Fig.Bus.Binary.Client (Commands(..), busClient) where import Fig.Prelude @@ -9,14 +7,14 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async -import Data.ByteString (hPut, hGet) +import Data.ByteString (hPut) import Fig.Utils.Net -import Fig.Bus.Binary +import Fig.Bus.Binary.Utils data Commands m = Commands - { subscribe :: EventType -> m () - , publish :: EventType -> ByteString -> m () + { subscribe :: !(ByteString -> m ()) + , publish :: !(ByteString -> ByteString -> m ()) } newtype FigBusClientException = FigBusClientException Text @@ -27,16 +25,16 @@ busClient :: forall m. (MonadIO m, MonadThrow m, MonadMask m) => (Text, Text) -> (Commands IO -> IO ()) -> - (Commands IO -> SExpr -> IO ()) -> + (Commands IO -> ByteString -> ByteString -> IO ()) -> IO () -> m () busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> let cmds = Commands - { subscribe = \(EventType ev) -> do + { subscribe = \ev -> do hPut h "s" writeLengthPrefixed h ev - , publish = \(EventType ev) d -> do + , publish = \ev d -> do hPut h "p" writeLengthPrefixed h ev writeLengthPrefixed h d @@ -45,11 +43,9 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu ( do liftIO $ Async.concurrently_ (onConn cmds) do forever do - - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x + (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> liftIO $ onData cmds ev d + _else -> throwM . FigBusClientException $ "Server sent malformed data" , liftIO onQuit ) where @@ -60,10 +56,10 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu _testClient :: IO () _testClient = busClient ("localhost", "32050") (\cmds -> do - cmds.subscribe [sexp|foo|] + cmds.subscribe "foo" forever do Conc.threadDelay 1000000 - cmds.publish [sexp|bar|] [[sexp|42|]] + cmds.publish "bar" "42" ) - (\_cmds d -> log $ "Received: " <> pretty d) + (\_cmds ev d -> log $ "Received: " <> tshow (ev, d)) (pure ()) diff --git a/fig-bus/src/Fig/Bus/Binary/Utils.hs b/fig-bus/src/Fig/Bus/Binary/Utils.hs new file mode 100644 index 0000000..734c7c9 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary/Utils.hs @@ -0,0 +1,44 @@ +module Fig.Bus.Binary.Utils where + +import Fig.Prelude + +import Data.Word (Word8, Word32) +import Data.Bits ((.|.), (.&.), shiftL, shiftR) +import Data.ByteString (hPut, hGet) +import qualified Data.ByteString as BS + +newtype EventType = EventType ByteString + deriving (Show, Eq, Ord) + +intFromLEBytes :: [Word8] -> Int +intFromLEBytes [] = 0 +intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x + +readLengthPrefixed :: Handle -> IO (Maybe ByteString) +readLengthPrefixed h = do + n <- hGet h 4 + case intFromLEBytes (BS.unpack n) of + 0 -> pure Nothing + len -> do + x <- hGet h len + pure $ Just x + + +readEvent :: Handle -> IO (Maybe EventType) +readEvent h = do + mb <- readLengthPrefixed h + pure $ EventType <$> mb + +writeLengthPrefixed :: Handle -> ByteString -> IO () +writeLengthPrefixed h d = do + let l :: Word32 = fromIntegral $ BS.length d + let bytes = + [ fromIntegral $ l .&. 0xff + , fromIntegral $ shiftR l 8 .&. 0xff + , fromIntegral $ shiftR l 16 .&. 0xff + , fromIntegral $ shiftR l 24 .&. 0xff + ] + hPut h $ BS.pack bytes <> d + +writeEvent :: Handle -> EventType -> IO () +writeEvent h (EventType d) = writeLengthPrefixed h d -- cgit v1.2.3