diff options
| author | LLLL Colonq <llll@colonq> | 2025-05-06 01:52:24 -0400 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2025-05-06 01:52:24 -0400 |
| commit | 2bac23772ea3b8e95e27bcd4f8d9c4d91538f840 (patch) | |
| tree | 691785e14429c6a70b827d87e95b1a9847307a6b /fig-bus | |
| parent | a8eaa15e20779320eafc2e70093a3dd632da01ac (diff) | |
Binary bus
Diffstat (limited to 'fig-bus')
| -rw-r--r-- | fig-bus/fig-bus.cabal | 6 | ||||
| -rw-r--r-- | fig-bus/main/Main.hs | 4 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/Binary.hs | 108 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/Binary/Client.hs | 69 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/SExp.hs (renamed from fig-bus/src/Fig/Bus.hs) | 2 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/SExp/Client.hs (renamed from fig-bus/src/Fig/Bus/Client.hs) | 2 |
6 files changed, 186 insertions, 5 deletions
diff --git a/fig-bus/fig-bus.cabal b/fig-bus/fig-bus.cabal index 327582b..d8caf99 100644 --- a/fig-bus/fig-bus.cabal +++ b/fig-bus/fig-bus.cabal @@ -32,8 +32,10 @@ library import: deps hs-source-dirs: src exposed-modules: - Fig.Bus - Fig.Bus.Client + Fig.Bus.SExp + Fig.Bus.Binary + Fig.Bus.SExp.Client + Fig.Bus.Binary.Client executable fig-bus import: defaults diff --git a/fig-bus/main/Main.hs b/fig-bus/main/Main.hs index 9f84fd2..dcafffd 100644 --- a/fig-bus/main/Main.hs +++ b/fig-bus/main/Main.hs @@ -5,6 +5,7 @@ import Fig.Prelude import Options.Applicative import qualified Fig.Bus +import qualified Fig.Bus.Binary data Opts = Opts { host :: Text @@ -22,4 +23,5 @@ main = do ( fullDesc <> header "fig-bus - a pub/sub message bus" ) - Fig.Bus.main (Just opts.host, opts.port) + -- Fig.Bus.main (Just opts.host, opts.port) + Fig.Bus.Binary.main (Just opts.host, opts.port) diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs new file mode 100644 index 0000000..6329d01 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary.hs @@ -0,0 +1,108 @@ +module Fig.Bus.Binary (main) where + +import Fig.Prelude + +import Control.Monad (when) +import Control.Concurrent.MVar as MVar + +import Data.Word (Word8, Word32) +import Data.Bits ((.|.), (.&.), shiftL, shiftR) +import qualified Data.List as List +import Data.ByteString (hPut, hGet) +import qualified Data.ByteString as BS +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.IORef as IORef + +import Fig.Utils.Net + +newtype EventType = EventType ByteString + deriving (Show, Eq, Ord) + +newtype BusState = BusState + { subscriptions :: Map EventType [Handle] + } + +subscribe :: EventType -> Handle -> BusState -> BusState +subscribe ev h bs = bs + { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions + } + +unsubscribe :: EventType -> Handle -> BusState -> BusState +unsubscribe ev h bs = bs + { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions + } + +intFromLEBytes :: [Word8] -> Int +intFromLEBytes [] = 0 +intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x + +readLengthPrefixed :: Handle -> IO (Maybe ByteString) +readLengthPrefixed h = do + n <- hGet h 4 + log $ "parsing: " <> tshow n + case intFromLEBytes (BS.unpack n) of + 0 -> pure Nothing + len -> do + log $ "reading: " <> tshow len + x <- hGet h len + pure $ Just x + +readEvent :: Handle -> IO (Maybe EventType) +readEvent h = do + mb <- readLengthPrefixed h + pure $ EventType <$> mb + +writeLengthPrefixed :: Handle -> ByteString -> IO () +writeLengthPrefixed h d = do + let l :: Word32 = fromIntegral $ BS.length d + let bytes = + [ fromIntegral $ l .&. 0xff + , fromIntegral $ shiftR l 8 .&. 0xff + , fromIntegral $ shiftR l 16 .&. 0xff + , fromIntegral $ shiftR l 24 .&. 0xff + ] + hPut h $ BS.pack bytes <> d + +writeEvent :: Handle -> EventType -> IO () +writeEvent h (EventType d) = writeLengthPrefixed h d + +publish :: EventType -> ByteString -> BusState -> IO () +publish ev d bs = + case Map.lookup ev bs.subscriptions of + Nothing -> pure () + Just hs -> forM_ hs \h -> do + writeEvent h ev + writeLengthPrefixed h d + +main :: (Maybe Text, Text) -> IO () +main bind = do + st <- MVar.newMVar $ BusState { subscriptions = Map.empty } + server bind do + subs <- IORef.newIORef ([] :: [EventType]) + pure \h peer -> + ( do + let + go = do + c <- hGet h 1 + when (BS.length c == 1) do + case BS.head c of + 115 -> readEvent h >>= \case + Just ev -> do + log $ tshow peer <> " subscribing to: " <> tshow ev + IORef.modifyIORef' subs (ev:) + MVar.modifyMVar_ st (pure . subscribe ev h) + go + _ -> log "malformed subscription" + 112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> do + log $ tshow peer <> " publishing to: " <> tshow ev + publish ev d =<< MVar.readMVar st + go + _ -> log "malformed publish" + _ -> log "unknown" + go + , do + ss <- IORef.readIORef subs + MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss + ) diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs new file mode 100644 index 0000000..a41f300 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -0,0 +1,69 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.Binary.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc +import qualified Control.Concurrent.Async as Async + +import Data.ByteString (hPut, hGet) + +import Fig.Utils.Net +import Fig.Bus.Binary + +data Commands m = Commands + { subscribe :: EventType -> m () + , publish :: EventType -> ByteString -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + cmds = Commands + { subscribe = \(EventType ev) -> do + hPut h "s" + writeLengthPrefixed h ev + , publish = \(EventType ev) d -> do + hPut h "p" + writeLengthPrefixed h ev + writeLengthPrefixed h d + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> log $ "Received: " <> pretty d) + (pure ()) diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus/SExp.hs index 2102864..ddd2896 100644 --- a/fig-bus/src/Fig/Bus.hs +++ b/fig-bus/src/Fig/Bus/SExp.hs @@ -1,4 +1,4 @@ -module Fig.Bus (main) where +module Fig.Bus.SExp (main) where import Fig.Prelude diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/SExp/Client.hs index d2c6b14..780f78f 100644 --- a/fig-bus/src/Fig/Bus/Client.hs +++ b/fig-bus/src/Fig/Bus/SExp/Client.hs @@ -1,6 +1,6 @@ {-# Language QuasiQuotes #-} -module Fig.Bus.Client (Commands(..), busClient) where +module Fig.Bus.SExp.Client (Commands(..), busClient) where import Fig.Prelude |
