From dcef0b65069fb38fd0f6c4382353167f603ebff1 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Thu, 16 Nov 2023 19:06:43 -0500 Subject: Initial commit --- fig-bus/src/Fig/Bus.hs | 62 +++++++++++++++++++++++++++++++++++++++++ fig-bus/src/Fig/Bus/Client.hs | 65 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 fig-bus/src/Fig/Bus.hs create mode 100644 fig-bus/src/Fig/Bus/Client.hs (limited to 'fig-bus/src/Fig') diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus.hs new file mode 100644 index 0000000..2102864 --- /dev/null +++ b/fig-bus/src/Fig/Bus.hs @@ -0,0 +1,62 @@ +module Fig.Bus (main) where + +import Fig.Prelude + +import Control.Concurrent.MVar as MVar + +import qualified Data.List as List +import Data.ByteString (hPut, hGetLine) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.SExpr +import Fig.Utils.Net + +newtype BusState = BusState + { subscriptions :: Map SExpr [Handle] + } + +subscribe :: SExpr -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: SExpr -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +publish :: SExpr -> [SExpr] -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [SExpr]) + pure \h peer -> + ( do + forever do + line <- throwLeft id . decodeUtf8' =<< hGetLine h + case parseSExpr line of + Just (SExprList (SExprSymbol "ping":_)) -> do + log $ tshow peer <> " pinged" + hPut h . encodeUtf8 $ "(pong)\n" + Just (SExprList [SExprSymbol "sub", ev]) -> do + log $ tshow peer <> " subscribing to: " <> pretty ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + Just (SExprList (SExprSymbol "pub":ev:d)) -> do + log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev + publish ev d =<< MVar.readMVar st + Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x + Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/Client.hs new file mode 100644 index 0000000..6d72ad4 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Client.hs @@ -0,0 +1,65 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc + +import Data.ByteString (hPut, hGetLine) + +import Fig.Utils.Net +import Fig.Utils.SExpr + +data Commands m = Commands + { ping :: m () + , subscribe :: SExpr -> m () + , publish :: SExpr -> [SExpr] -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO . void . Conc.forkIO $ onConn cmds + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO $ exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> putStrLn $ "Received: " <> pretty d) + (pure ()) -- cgit v1.2.3