summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2025-05-06 01:52:24 -0400
committerLLLL Colonq <llll@colonq>2025-05-06 01:52:24 -0400
commit2bac23772ea3b8e95e27bcd4f8d9c4d91538f840 (patch)
tree691785e14429c6a70b827d87e95b1a9847307a6b /fig-bus/src/Fig
parenta8eaa15e20779320eafc2e70093a3dd632da01ac (diff)
Binary bus
Diffstat (limited to 'fig-bus/src/Fig')
-rw-r--r--fig-bus/src/Fig/Bus/Binary.hs108
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Client.hs69
-rw-r--r--fig-bus/src/Fig/Bus/SExp.hs (renamed from fig-bus/src/Fig/Bus.hs)2
-rw-r--r--fig-bus/src/Fig/Bus/SExp/Client.hs (renamed from fig-bus/src/Fig/Bus/Client.hs)2
4 files changed, 179 insertions, 2 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs
new file mode 100644
index 0000000..6329d01
--- /dev/null
+++ b/fig-bus/src/Fig/Bus/Binary.hs
@@ -0,0 +1,108 @@
+module Fig.Bus.Binary (main) where
+
+import Fig.Prelude
+
+import Control.Monad (when)
+import Control.Concurrent.MVar as MVar
+
+import Data.Word (Word8, Word32)
+import Data.Bits ((.|.), (.&.), shiftL, shiftR)
+import qualified Data.List as List
+import Data.ByteString (hPut, hGet)
+import qualified Data.ByteString as BS
+import Data.Map.Strict (Map)
+import qualified Data.Map.Strict as Map
+import qualified Data.IORef as IORef
+
+import Fig.Utils.Net
+
+newtype EventType = EventType ByteString
+ deriving (Show, Eq, Ord)
+
+newtype BusState = BusState
+ { subscriptions :: Map EventType [Handle]
+ }
+
+subscribe :: EventType -> Handle -> BusState -> BusState
+subscribe ev h bs = bs
+ { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions
+ }
+
+unsubscribe :: EventType -> Handle -> BusState -> BusState
+unsubscribe ev h bs = bs
+ { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
+ }
+
+intFromLEBytes :: [Word8] -> Int
+intFromLEBytes [] = 0
+intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x
+
+readLengthPrefixed :: Handle -> IO (Maybe ByteString)
+readLengthPrefixed h = do
+ n <- hGet h 4
+ log $ "parsing: " <> tshow n
+ case intFromLEBytes (BS.unpack n) of
+ 0 -> pure Nothing
+ len -> do
+ log $ "reading: " <> tshow len
+ x <- hGet h len
+ pure $ Just x
+
+readEvent :: Handle -> IO (Maybe EventType)
+readEvent h = do
+ mb <- readLengthPrefixed h
+ pure $ EventType <$> mb
+
+writeLengthPrefixed :: Handle -> ByteString -> IO ()
+writeLengthPrefixed h d = do
+ let l :: Word32 = fromIntegral $ BS.length d
+ let bytes =
+ [ fromIntegral $ l .&. 0xff
+ , fromIntegral $ shiftR l 8 .&. 0xff
+ , fromIntegral $ shiftR l 16 .&. 0xff
+ , fromIntegral $ shiftR l 24 .&. 0xff
+ ]
+ hPut h $ BS.pack bytes <> d
+
+writeEvent :: Handle -> EventType -> IO ()
+writeEvent h (EventType d) = writeLengthPrefixed h d
+
+publish :: EventType -> ByteString -> BusState -> IO ()
+publish ev d bs =
+ case Map.lookup ev bs.subscriptions of
+ Nothing -> pure ()
+ Just hs -> forM_ hs \h -> do
+ writeEvent h ev
+ writeLengthPrefixed h d
+
+main :: (Maybe Text, Text) -> IO ()
+main bind = do
+ st <- MVar.newMVar $ BusState { subscriptions = Map.empty }
+ server bind do
+ subs <- IORef.newIORef ([] :: [EventType])
+ pure \h peer ->
+ ( do
+ let
+ go = do
+ c <- hGet h 1
+ when (BS.length c == 1) do
+ case BS.head c of
+ 115 -> readEvent h >>= \case
+ Just ev -> do
+ log $ tshow peer <> " subscribing to: " <> tshow ev
+ IORef.modifyIORef' subs (ev:)
+ MVar.modifyMVar_ st (pure . subscribe ev h)
+ go
+ _ -> log "malformed subscription"
+ 112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case
+ (Just ev, Just d) -> do
+ log $ tshow peer <> " publishing to: " <> tshow ev
+ publish ev d =<< MVar.readMVar st
+ go
+ _ -> log "malformed publish"
+ _ -> log "unknown"
+ go
+ , do
+ ss <- IORef.readIORef subs
+ MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss
+ )
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs
new file mode 100644
index 0000000..a41f300
--- /dev/null
+++ b/fig-bus/src/Fig/Bus/Binary/Client.hs
@@ -0,0 +1,69 @@
+{-# Language QuasiQuotes #-}
+
+module Fig.Bus.Binary.Client (Commands(..), busClient) where
+
+import Fig.Prelude
+
+import System.Exit (exitFailure)
+
+import qualified Control.Concurrent as Conc
+import qualified Control.Concurrent.Async as Async
+
+import Data.ByteString (hPut, hGet)
+
+import Fig.Utils.Net
+import Fig.Bus.Binary
+
+data Commands m = Commands
+ { subscribe :: EventType -> m ()
+ , publish :: EventType -> ByteString -> m ()
+ }
+
+newtype FigBusClientException = FigBusClientException Text
+ deriving (Show, Eq, Ord)
+instance Exception FigBusClientException
+
+busClient :: forall m.
+ (MonadIO m, MonadThrow m, MonadMask m) =>
+ (Text, Text) ->
+ (Commands IO -> IO ()) ->
+ (Commands IO -> SExpr -> IO ()) ->
+ IO () ->
+ m ()
+busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
+ let
+ cmds = Commands
+ { subscribe = \(EventType ev) -> do
+ hPut h "s"
+ writeLengthPrefixed h ev
+ , publish = \(EventType ev) d -> do
+ hPut h "p"
+ writeLengthPrefixed h ev
+ writeLengthPrefixed h d
+ }
+ in
+ ( do
+ liftIO $ Async.concurrently_ (onConn cmds) do
+ forever do
+
+ line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
+ case parseSExpr line of
+ Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
+ Just x -> liftIO $ onData cmds x
+ , liftIO onQuit
+ )
+ where
+ catchFailure body = catch body \(e :: IOException) -> do
+ log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e
+ liftIO exitFailure
+
+_testClient :: IO ()
+_testClient = busClient ("localhost", "32050")
+ (\cmds -> do
+ cmds.subscribe [sexp|foo|]
+ forever do
+ Conc.threadDelay 1000000
+ cmds.publish [sexp|bar|] [[sexp|42|]]
+ )
+ (\_cmds d -> log $ "Received: " <> pretty d)
+ (pure ())
diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus/SExp.hs
index 2102864..ddd2896 100644
--- a/fig-bus/src/Fig/Bus.hs
+++ b/fig-bus/src/Fig/Bus/SExp.hs
@@ -1,4 +1,4 @@
-module Fig.Bus (main) where
+module Fig.Bus.SExp (main) where
import Fig.Prelude
diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/SExp/Client.hs
index d2c6b14..780f78f 100644
--- a/fig-bus/src/Fig/Bus/Client.hs
+++ b/fig-bus/src/Fig/Bus/SExp/Client.hs
@@ -1,6 +1,6 @@
{-# Language QuasiQuotes #-}
-module Fig.Bus.Client (Commands(..), busClient) where
+module Fig.Bus.SExp.Client (Commands(..), busClient) where
import Fig.Prelude