diff options
| author | LLLL Colonq <llll@colonq> | 2023-11-16 19:06:43 -0500 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2023-11-16 19:06:43 -0500 |
| commit | dcef0b65069fb38fd0f6c4382353167f603ebff1 (patch) | |
| tree | 45954ffe308c3dd056e6af4f734e6d2af89e5856 /fig-bus/src/Fig/Bus.hs | |
Initial commit
Diffstat (limited to 'fig-bus/src/Fig/Bus.hs')
| -rw-r--r-- | fig-bus/src/Fig/Bus.hs | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus.hs new file mode 100644 index 0000000..2102864 --- /dev/null +++ b/fig-bus/src/Fig/Bus.hs @@ -0,0 +1,62 @@ +module Fig.Bus (main) where + +import Fig.Prelude + +import Control.Concurrent.MVar as MVar + +import qualified Data.List as List +import Data.ByteString (hPut, hGetLine) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.SExpr +import Fig.Utils.Net + +newtype BusState = BusState + { subscriptions :: Map SExpr [Handle] + } + +subscribe :: SExpr -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: SExpr -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +publish :: SExpr -> [SExpr] -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n" + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [SExpr]) + pure \h peer -> + ( do + forever do + line <- throwLeft id . decodeUtf8' =<< hGetLine h + case parseSExpr line of + Just (SExprList (SExprSymbol "ping":_)) -> do + log $ tshow peer <> " pinged" + hPut h . encodeUtf8 $ "(pong)\n" + Just (SExprList [SExprSymbol "sub", ev]) -> do + log $ tshow peer <> " subscribing to: " <> pretty ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + Just (SExprList (SExprSymbol "pub":ev:d)) -> do + log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev + publish ev d =<< MVar.readMVar st + Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x + Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) |
