From 2bac23772ea3b8e95e27bcd4f8d9c4d91538f840 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Tue, 6 May 2025 01:52:24 -0400 Subject: Binary bus --- fig-bus/src/Fig/Bus.hs | 62 -------------------- fig-bus/src/Fig/Bus/Binary.hs | 108 +++++++++++++++++++++++++++++++++++ fig-bus/src/Fig/Bus/Binary/Client.hs | 69 ++++++++++++++++++++++ fig-bus/src/Fig/Bus/Client.hs | 66 --------------------- fig-bus/src/Fig/Bus/SExp.hs | 62 ++++++++++++++++++++ fig-bus/src/Fig/Bus/SExp/Client.hs | 66 +++++++++++++++++++++ 6 files changed, 305 insertions(+), 128 deletions(-) delete mode 100644 fig-bus/src/Fig/Bus.hs create mode 100644 fig-bus/src/Fig/Bus/Binary.hs create mode 100644 fig-bus/src/Fig/Bus/Binary/Client.hs delete mode 100644 fig-bus/src/Fig/Bus/Client.hs create mode 100644 fig-bus/src/Fig/Bus/SExp.hs create mode 100644 fig-bus/src/Fig/Bus/SExp/Client.hs (limited to 'fig-bus/src/Fig') diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus.hs deleted file mode 100644 index 2102864..0000000 --- a/fig-bus/src/Fig/Bus.hs +++ /dev/null @@ -1,62 +0,0 @@ -module Fig.Bus (main) where - -import Fig.Prelude - -import Control.Concurrent.MVar as MVar - -import qualified Data.List as List -import Data.ByteString (hPut, hGetLine) -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import qualified Data.IORef as IORef - -import Fig.Utils.SExpr -import Fig.Utils.Net - -newtype BusState = BusState - { subscriptions :: Map SExpr [Handle] - } - -subscribe :: SExpr -> Handle -> BusState -> BusState -subscribe ev h bs = bs - { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions - } - -unsubscribe :: SExpr -> Handle -> BusState -> BusState -unsubscribe ev h bs = bs - { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions - } - -publish :: SExpr -> [SExpr] -> BusState -> IO () -publish ev d bs = - case Map.lookup ev bs.subscriptions of - Nothing -> pure () - Just hs -> forM_ hs \h -> do - hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" - -main :: (Maybe Text, Text) -> IO () -main bind = do - st <- MVar.newMVar $ BusState { subscriptions = Map.empty } - server bind do - subs <- IORef.newIORef ([] :: [SExpr]) - pure \h peer -> - ( do - forever do - line <- throwLeft id . decodeUtf8' =<< hGetLine h - case parseSExpr line of - Just (SExprList (SExprSymbol "ping":_)) -> do - log $ tshow peer <> " pinged" - hPut h . encodeUtf8 $ "(pong)\n" - Just (SExprList [SExprSymbol "sub", ev]) -> do - log $ tshow peer <> " subscribing to: " <> pretty ev - IORef.modifyIORef' subs (ev:) - MVar.modifyMVar_ st (pure . subscribe ev h) - Just (SExprList (SExprSymbol "pub":ev:d)) -> do - log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev - publish ev d =<< MVar.readMVar st - Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x - Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line - , do - ss <- IORef.readIORef subs - MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss - ) diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs new file mode 100644 index 0000000..6329d01 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary.hs @@ -0,0 +1,108 @@ +module Fig.Bus.Binary (main) where + +import Fig.Prelude + +import Control.Monad (when) +import Control.Concurrent.MVar as MVar + +import Data.Word (Word8, Word32) +import Data.Bits ((.|.), (.&.), shiftL, shiftR) +import qualified Data.List as List +import Data.ByteString (hPut, hGet) +import qualified Data.ByteString as BS +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.Net + +newtype EventType = EventType ByteString + deriving (Show, Eq, Ord) + +newtype BusState = BusState + { subscriptions :: Map EventType [Handle] + } + +subscribe :: EventType -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: EventType -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +intFromLEBytes :: [Word8] -> Int +intFromLEBytes [] = 0 +intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x + +readLengthPrefixed :: Handle -> IO (Maybe ByteString) +readLengthPrefixed h = do + n <- hGet h 4 + log $ "parsing: " <> tshow n + case intFromLEBytes (BS.unpack n) of + 0 -> pure Nothing + len -> do + log $ "reading: " <> tshow len + x <- hGet h len + pure $ Just x + +readEvent :: Handle -> IO (Maybe EventType) +readEvent h = do + mb <- readLengthPrefixed h + pure $ EventType <$> mb + +writeLengthPrefixed :: Handle -> ByteString -> IO () +writeLengthPrefixed h d = do + let l :: Word32 = fromIntegral $ BS.length d + let bytes = + [ fromIntegral $ l .&. 0xff + , fromIntegral $ shiftR l 8 .&. 0xff + , fromIntegral $ shiftR l 16 .&. 0xff + , fromIntegral $ shiftR l 24 .&. 0xff + ] + hPut h $ BS.pack bytes <> d + +writeEvent :: Handle -> EventType -> IO () +writeEvent h (EventType d) = writeLengthPrefixed h d + +publish :: EventType -> ByteString -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + writeEvent h ev + writeLengthPrefixed h d + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [EventType]) + pure \h peer -> + ( do + let + go = do + c <- hGet h 1 + when (BS.length c == 1) do + case BS.head c of + 115 -> readEvent h >>= \case + Just ev -> do + log $ tshow peer <> " subscribing to: " <> tshow ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + go + _ -> log "malformed subscription" + 112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> do + log $ tshow peer <> " publishing to: " <> tshow ev + publish ev d =<< MVar.readMVar st + go + _ -> log "malformed publish" + _ -> log "unknown" + go + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs new file mode 100644 index 0000000..a41f300 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -0,0 +1,69 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.Binary.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc +import qualified Control.Concurrent.Async as Async + +import Data.ByteString (hPut, hGet) + +import Fig.Utils.Net +import Fig.Bus.Binary + +data Commands m = Commands + { subscribe :: EventType -> m () + , publish :: EventType -> ByteString -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + cmds = Commands + { subscribe = \(EventType ev) -> do + hPut h "s" + writeLengthPrefixed h ev + , publish = \(EventType ev) d -> do + hPut h "p" + writeLengthPrefixed h ev + writeLengthPrefixed h d + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> log $ "Received: " <> pretty d) + (pure ()) diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/Client.hs deleted file mode 100644 index d2c6b14..0000000 --- a/fig-bus/src/Fig/Bus/Client.hs +++ /dev/null @@ -1,66 +0,0 @@ -{-# Language QuasiQuotes #-} - -module Fig.Bus.Client (Commands(..), busClient) where - -import Fig.Prelude - -import System.Exit (exitFailure) - -import qualified Control.Concurrent as Conc -import qualified Control.Concurrent.Async as Async - -import Data.ByteString (hPut, hGetLine) - -import Fig.Utils.Net -import Fig.Utils.SExpr - -data Commands m = Commands - { ping :: m () - , subscribe :: SExpr -> m () - , publish :: SExpr -> [SExpr] -> m () - } - -newtype FigBusClientException = FigBusClientException Text - deriving (Show, Eq, Ord) -instance Exception FigBusClientException - -busClient :: forall m. - (MonadIO m, MonadThrow m, MonadMask m) => - (Text, Text) -> - (Commands IO -> IO ()) -> - (Commands IO -> SExpr -> IO ()) -> - IO () -> - m () -busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> - let - sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" - cmds = Commands - { ping = sendSexpr [sexp|(ping)|] - , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] - , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] - } - in - ( do - liftIO $ Async.concurrently_ (onConn cmds) do - forever do - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x - , liftIO onQuit - ) - where - catchFailure body = catch body \(e :: IOException) -> do - log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e - liftIO exitFailure - -_testClient :: IO () -_testClient = busClient ("localhost", "32050") - (\cmds -> do - cmds.subscribe [sexp|foo|] - forever do - Conc.threadDelay 1000000 - cmds.publish [sexp|bar|] [[sexp|42|]] - ) - (\_cmds d -> log $ "Received: " <> pretty d) - (pure ()) diff --git a/fig-bus/src/Fig/Bus/SExp.hs b/fig-bus/src/Fig/Bus/SExp.hs new file mode 100644 index 0000000..ddd2896 --- /dev/null +++ b/fig-bus/src/Fig/Bus/SExp.hs @@ -0,0 +1,62 @@ +module Fig.Bus.SExp (main) where + +import Fig.Prelude + +import Control.Concurrent.MVar as MVar + +import qualified Data.List as List +import Data.ByteString (hPut, hGetLine) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.SExpr +import Fig.Utils.Net + +newtype BusState = BusState + { subscriptions :: Map SExpr [Handle] + } + +subscribe :: SExpr -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: SExpr -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +publish :: SExpr -> [SExpr] -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [SExpr]) + pure \h peer -> + ( do + forever do + line <- throwLeft id . decodeUtf8' =<< hGetLine h + case parseSExpr line of + Just (SExprList (SExprSymbol "ping":_)) -> do + log $ tshow peer <> " pinged" + hPut h . encodeUtf8 $ "(pong)\n" + Just (SExprList [SExprSymbol "sub", ev]) -> do + log $ tshow peer <> " subscribing to: " <> pretty ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + Just (SExprList (SExprSymbol "pub":ev:d)) -> do + log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev + publish ev d =<< MVar.readMVar st + Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x + Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/SExp/Client.hs b/fig-bus/src/Fig/Bus/SExp/Client.hs new file mode 100644 index 0000000..780f78f --- /dev/null +++ b/fig-bus/src/Fig/Bus/SExp/Client.hs @@ -0,0 +1,66 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.SExp.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc +import qualified Control.Concurrent.Async as Async + +import Data.ByteString (hPut, hGetLine) + +import Fig.Utils.Net +import Fig.Utils.SExpr + +data Commands m = Commands + { ping :: m () + , subscribe :: SExpr -> m () + , publish :: SExpr -> [SExpr] -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> log $ "Received: " <> pretty d) + (pure ()) -- cgit v1.2.3