summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig
diff options
context:
space:
mode:
Diffstat (limited to 'fig-bus/src/Fig')
-rw-r--r--fig-bus/src/Fig/Bus.hs62
-rw-r--r--fig-bus/src/Fig/Bus/Client.hs65
2 files changed, 127 insertions, 0 deletions
diff --git a/fig-bus/src/Fig/Bus.hs b/fig-bus/src/Fig/Bus.hs
new file mode 100644
index 0000000..2102864
--- /dev/null
+++ b/fig-bus/src/Fig/Bus.hs
@@ -0,0 +1,62 @@
+module Fig.Bus (main) where
+
+import Fig.Prelude
+
+import Control.Concurrent.MVar as MVar
+
+import qualified Data.List as List
+import Data.ByteString (hPut, hGetLine)
+import Data.Map.Strict (Map)
+import qualified Data.Map.Strict as Map
+import qualified Data.IORef as IORef
+
+import Fig.Utils.SExpr
+import Fig.Utils.Net
+
+newtype BusState = BusState
+ { subscriptions :: Map SExpr [Handle]
+ }
+
+subscribe :: SExpr -> Handle -> BusState -> BusState
+subscribe ev h bs = bs
+ { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions
+ }
+
+unsubscribe :: SExpr -> Handle -> BusState -> BusState
+unsubscribe ev h bs = bs
+ { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
+ }
+
+publish :: SExpr -> [SExpr] -> BusState -> IO ()
+publish ev d bs =
+ case Map.lookup ev bs.subscriptions of
+ Nothing -> pure ()
+ Just hs -> forM_ hs \h -> do
+ hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n"
+
+main :: (Maybe Text, Text) -> IO ()
+main bind = do
+ st <- MVar.newMVar $ BusState { subscriptions = Map.empty }
+ server bind do
+ subs <- IORef.newIORef ([] :: [SExpr])
+ pure \h peer ->
+ ( do
+ forever do
+ line <- throwLeft id . decodeUtf8' =<< hGetLine h
+ case parseSExpr line of
+ Just (SExprList (SExprSymbol "ping":_)) -> do
+ log $ tshow peer <> " pinged"
+ hPut h . encodeUtf8 $ "(pong)\n"
+ Just (SExprList [SExprSymbol "sub", ev]) -> do
+ log $ tshow peer <> " subscribing to: " <> pretty ev
+ IORef.modifyIORef' subs (ev:)
+ MVar.modifyMVar_ st (pure . subscribe ev h)
+ Just (SExprList (SExprSymbol "pub":ev:d)) -> do
+ log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev
+ publish ev d =<< MVar.readMVar st
+ Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x
+ Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line
+ , do
+ ss <- IORef.readIORef subs
+ MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss
+ )
diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/Client.hs
new file mode 100644
index 0000000..6d72ad4
--- /dev/null
+++ b/fig-bus/src/Fig/Bus/Client.hs
@@ -0,0 +1,65 @@
+{-# Language QuasiQuotes #-}
+
+module Fig.Bus.Client (Commands(..), busClient) where
+
+import Fig.Prelude
+
+import System.Exit (exitFailure)
+
+import qualified Control.Concurrent as Conc
+
+import Data.ByteString (hPut, hGetLine)
+
+import Fig.Utils.Net
+import Fig.Utils.SExpr
+
+data Commands m = Commands
+ { ping :: m ()
+ , subscribe :: SExpr -> m ()
+ , publish :: SExpr -> [SExpr] -> m ()
+ }
+
+newtype FigBusClientException = FigBusClientException Text
+ deriving (Show, Eq, Ord)
+instance Exception FigBusClientException
+
+busClient :: forall m.
+ (MonadIO m, MonadThrow m, MonadMask m) =>
+ (Text, Text) ->
+ (Commands IO -> IO ()) ->
+ (Commands IO -> SExpr -> IO ()) ->
+ IO () ->
+ m ()
+busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
+ let
+ sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n"
+ cmds = Commands
+ { ping = sendSexpr [sexp|(ping)|]
+ , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|]
+ , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|]
+ }
+ in
+ ( do
+ liftIO . void . Conc.forkIO $ onConn cmds
+ forever do
+ line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
+ case parseSExpr line of
+ Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
+ Just x -> liftIO $ onData cmds x
+ , liftIO onQuit
+ )
+ where
+ catchFailure body = catch body \(e :: IOException) -> do
+ log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e
+ liftIO $ exitFailure
+
+_testClient :: IO ()
+_testClient = busClient ("localhost", "32050")
+ (\cmds -> do
+ cmds.subscribe [sexp|foo|]
+ forever do
+ Conc.threadDelay 1000000
+ cmds.publish [sexp|bar|] [[sexp|42|]]
+ )
+ (\_cmds d -> putStrLn $ "Received: " <> pretty d)
+ (pure ())