summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/Binary
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2025-05-06 03:45:51 -0400
committerLLLL Colonq <llll@colonq>2025-05-06 03:45:51 -0400
commitf010327cd1b9fd3b260ef1623d252723f395fbc7 (patch)
treee1d88e671d6e909e3986342807a71e55a86df6a8 /fig-bus/src/Fig/Bus/Binary
parent2bac23772ea3b8e95e27bcd4f8d9c4d91538f840 (diff)
Binary message bus
Diffstat (limited to 'fig-bus/src/Fig/Bus/Binary')
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Client.hs30
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Utils.hs44
2 files changed, 57 insertions, 17 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs
index a41f300..a2e601f 100644
--- a/fig-bus/src/Fig/Bus/Binary/Client.hs
+++ b/fig-bus/src/Fig/Bus/Binary/Client.hs
@@ -1,5 +1,3 @@
-{-# Language QuasiQuotes #-}
-
module Fig.Bus.Binary.Client (Commands(..), busClient) where
import Fig.Prelude
@@ -9,14 +7,14 @@ import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
-import Data.ByteString (hPut, hGet)
+import Data.ByteString (hPut)
import Fig.Utils.Net
-import Fig.Bus.Binary
+import Fig.Bus.Binary.Utils
data Commands m = Commands
- { subscribe :: EventType -> m ()
- , publish :: EventType -> ByteString -> m ()
+ { subscribe :: !(ByteString -> m ())
+ , publish :: !(ByteString -> ByteString -> m ())
}
newtype FigBusClientException = FigBusClientException Text
@@ -27,16 +25,16 @@ busClient :: forall m.
(MonadIO m, MonadThrow m, MonadMask m) =>
(Text, Text) ->
(Commands IO -> IO ()) ->
- (Commands IO -> SExpr -> IO ()) ->
+ (Commands IO -> ByteString -> ByteString -> IO ()) ->
IO () ->
m ()
busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
let
cmds = Commands
- { subscribe = \(EventType ev) -> do
+ { subscribe = \ev -> do
hPut h "s"
writeLengthPrefixed h ev
- , publish = \(EventType ev) d -> do
+ , publish = \ev d -> do
hPut h "p"
writeLengthPrefixed h ev
writeLengthPrefixed h d
@@ -45,11 +43,9 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu
( do
liftIO $ Async.concurrently_ (onConn cmds) do
forever do
-
- line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
- case parseSExpr line of
- Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
- Just x -> liftIO $ onData cmds x
+ (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
+ (Just ev, Just d) -> liftIO $ onData cmds ev d
+ _else -> throwM . FigBusClientException $ "Server sent malformed data"
, liftIO onQuit
)
where
@@ -60,10 +56,10 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu
_testClient :: IO ()
_testClient = busClient ("localhost", "32050")
(\cmds -> do
- cmds.subscribe [sexp|foo|]
+ cmds.subscribe "foo"
forever do
Conc.threadDelay 1000000
- cmds.publish [sexp|bar|] [[sexp|42|]]
+ cmds.publish "bar" "42"
)
- (\_cmds d -> log $ "Received: " <> pretty d)
+ (\_cmds ev d -> log $ "Received: " <> tshow (ev, d))
(pure ())
diff --git a/fig-bus/src/Fig/Bus/Binary/Utils.hs b/fig-bus/src/Fig/Bus/Binary/Utils.hs
new file mode 100644
index 0000000..734c7c9
--- /dev/null
+++ b/fig-bus/src/Fig/Bus/Binary/Utils.hs
@@ -0,0 +1,44 @@
+module Fig.Bus.Binary.Utils where
+
+import Fig.Prelude
+
+import Data.Word (Word8, Word32)
+import Data.Bits ((.|.), (.&.), shiftL, shiftR)
+import Data.ByteString (hPut, hGet)
+import qualified Data.ByteString as BS
+
+newtype EventType = EventType ByteString
+ deriving (Show, Eq, Ord)
+
+intFromLEBytes :: [Word8] -> Int
+intFromLEBytes [] = 0
+intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x
+
+readLengthPrefixed :: Handle -> IO (Maybe ByteString)
+readLengthPrefixed h = do
+ n <- hGet h 4
+ case intFromLEBytes (BS.unpack n) of
+ 0 -> pure Nothing
+ len -> do
+ x <- hGet h len
+ pure $ Just x
+
+
+readEvent :: Handle -> IO (Maybe EventType)
+readEvent h = do
+ mb <- readLengthPrefixed h
+ pure $ EventType <$> mb
+
+writeLengthPrefixed :: Handle -> ByteString -> IO ()
+writeLengthPrefixed h d = do
+ let l :: Word32 = fromIntegral $ BS.length d
+ let bytes =
+ [ fromIntegral $ l .&. 0xff
+ , fromIntegral $ shiftR l 8 .&. 0xff
+ , fromIntegral $ shiftR l 16 .&. 0xff
+ , fromIntegral $ shiftR l 24 .&. 0xff
+ ]
+ hPut h $ BS.pack bytes <> d
+
+writeEvent :: Handle -> EventType -> IO ()
+writeEvent h (EventType d) = writeLengthPrefixed h d