From 1f2e453d0c9f8412b9032cb4e655713ecdcf1fa3 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Mon, 26 May 2025 04:43:38 -0400 Subject: web: Refactor major style --- fig-bus/src/Fig/Bus/SExp.hs | 62 ---------------------------------- fig-bus/src/Fig/Bus/SExp/Client.hs | 66 ------------------------------------- fig-bus/src/Fig/Bus/SExpr.hs | 62 ++++++++++++++++++++++++++++++++++ fig-bus/src/Fig/Bus/SExpr/Client.hs | 66 +++++++++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+), 128 deletions(-) delete mode 100644 fig-bus/src/Fig/Bus/SExp.hs delete mode 100644 fig-bus/src/Fig/Bus/SExp/Client.hs create mode 100644 fig-bus/src/Fig/Bus/SExpr.hs create mode 100644 fig-bus/src/Fig/Bus/SExpr/Client.hs (limited to 'fig-bus/src/Fig/Bus') diff --git a/fig-bus/src/Fig/Bus/SExp.hs b/fig-bus/src/Fig/Bus/SExp.hs deleted file mode 100644 index ddd2896..0000000 --- a/fig-bus/src/Fig/Bus/SExp.hs +++ /dev/null @@ -1,62 +0,0 @@ -module Fig.Bus.SExp (main) where - -import Fig.Prelude - -import Control.Concurrent.MVar as MVar - -import qualified Data.List as List -import Data.ByteString (hPut, hGetLine) -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import qualified Data.IORef as IORef - -import Fig.Utils.SExpr -import Fig.Utils.Net - -newtype BusState = BusState - { subscriptions :: Map SExpr [Handle] - } - -subscribe :: SExpr -> Handle -> BusState -> BusState -subscribe ev h bs = bs - { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions - } - -unsubscribe :: SExpr -> Handle -> BusState -> BusState -unsubscribe ev h bs = bs - { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions - } - -publish :: SExpr -> [SExpr] -> BusState -> IO () -publish ev d bs = - case Map.lookup ev bs.subscriptions of - Nothing -> pure () - Just hs -> forM_ hs \h -> do - hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" - -main :: (Maybe Text, Text) -> IO () -main bind = do - st <- MVar.newMVar $ BusState { subscriptions = Map.empty } - server bind do - subs <- IORef.newIORef ([] :: [SExpr]) - pure \h peer -> - ( do - forever do - line <- throwLeft id . decodeUtf8' =<< hGetLine h - case parseSExpr line of - Just (SExprList (SExprSymbol "ping":_)) -> do - log $ tshow peer <> " pinged" - hPut h . encodeUtf8 $ "(pong)\n" - Just (SExprList [SExprSymbol "sub", ev]) -> do - log $ tshow peer <> " subscribing to: " <> pretty ev - IORef.modifyIORef' subs (ev:) - MVar.modifyMVar_ st (pure . subscribe ev h) - Just (SExprList (SExprSymbol "pub":ev:d)) -> do - log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev - publish ev d =<< MVar.readMVar st - Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x - Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line - , do - ss <- IORef.readIORef subs - MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss - ) diff --git a/fig-bus/src/Fig/Bus/SExp/Client.hs b/fig-bus/src/Fig/Bus/SExp/Client.hs deleted file mode 100644 index 780f78f..0000000 --- a/fig-bus/src/Fig/Bus/SExp/Client.hs +++ /dev/null @@ -1,66 +0,0 @@ -{-# Language QuasiQuotes #-} - -module Fig.Bus.SExp.Client (Commands(..), busClient) where - -import Fig.Prelude - -import System.Exit (exitFailure) - -import qualified Control.Concurrent as Conc -import qualified Control.Concurrent.Async as Async - -import Data.ByteString (hPut, hGetLine) - -import Fig.Utils.Net -import Fig.Utils.SExpr - -data Commands m = Commands - { ping :: m () - , subscribe :: SExpr -> m () - , publish :: SExpr -> [SExpr] -> m () - } - -newtype FigBusClientException = FigBusClientException Text - deriving (Show, Eq, Ord) -instance Exception FigBusClientException - -busClient :: forall m. - (MonadIO m, MonadThrow m, MonadMask m) => - (Text, Text) -> - (Commands IO -> IO ()) -> - (Commands IO -> SExpr -> IO ()) -> - IO () -> - m () -busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> - let - sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" - cmds = Commands - { ping = sendSexpr [sexp|(ping)|] - , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] - , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] - } - in - ( do - liftIO $ Async.concurrently_ (onConn cmds) do - forever do - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x - , liftIO onQuit - ) - where - catchFailure body = catch body \(e :: IOException) -> do - log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e - liftIO exitFailure - -_testClient :: IO () -_testClient = busClient ("localhost", "32050") - (\cmds -> do - cmds.subscribe [sexp|foo|] - forever do - Conc.threadDelay 1000000 - cmds.publish [sexp|bar|] [[sexp|42|]] - ) - (\_cmds d -> log $ "Received: " <> pretty d) - (pure ()) diff --git a/fig-bus/src/Fig/Bus/SExpr.hs b/fig-bus/src/Fig/Bus/SExpr.hs new file mode 100644 index 0000000..49cee90 --- /dev/null +++ b/fig-bus/src/Fig/Bus/SExpr.hs @@ -0,0 +1,62 @@ +module Fig.Bus.SExpr (main) where + +import Fig.Prelude + +import Control.Concurrent.MVar as MVar + +import qualified Data.List as List +import Data.ByteString (hPut, hGetLine) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.SExpr +import Fig.Utils.Net + +newtype BusState = BusState + { subscriptions :: Map SExpr [Handle] + } + +subscribe :: SExpr -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: SExpr -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +publish :: SExpr -> [SExpr] -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [SExpr]) + pure \h peer -> + ( do + forever do + line <- throwLeft id . decodeUtf8' =<< hGetLine h + case parseSExpr line of + Just (SExprList (SExprSymbol "ping":_)) -> do + log $ tshow peer <> " pinged" + hPut h . encodeUtf8 $ "(pong)\n" + Just (SExprList [SExprSymbol "sub", ev]) -> do + log $ tshow peer <> " subscribing to: " <> pretty ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + Just (SExprList (SExprSymbol "pub":ev:d)) -> do + log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev + publish ev d =<< MVar.readMVar st + Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x + Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/SExpr/Client.hs b/fig-bus/src/Fig/Bus/SExpr/Client.hs new file mode 100644 index 0000000..ccd41a7 --- /dev/null +++ b/fig-bus/src/Fig/Bus/SExpr/Client.hs @@ -0,0 +1,66 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.SExpr.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc +import qualified Control.Concurrent.Async as Async + +import Data.ByteString (hPut, hGetLine) + +import Fig.Utils.Net +import Fig.Utils.SExpr + +data Commands m = Commands + { ping :: m () + , subscribe :: SExpr -> m () + , publish :: SExpr -> [SExpr] -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> log $ "Received: " <> pretty d) + (pure ()) -- cgit v1.2.3