diff options
| author | LLLL Colonq <llll@colonq> | 2023-11-16 19:06:43 -0500 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2023-11-16 19:06:43 -0500 |
| commit | dcef0b65069fb38fd0f6c4382353167f603ebff1 (patch) | |
| tree | 45954ffe308c3dd056e6af4f734e6d2af89e5856 /fig-bus/src/Fig/Bus | |
Initial commit
Diffstat (limited to 'fig-bus/src/Fig/Bus')
| -rw-r--r-- | fig-bus/src/Fig/Bus/Client.hs | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/fig-bus/src/Fig/Bus/Client.hs b/fig-bus/src/Fig/Bus/Client.hs new file mode 100644 index 0000000..6d72ad4 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Client.hs @@ -0,0 +1,65 @@ +{-# Language QuasiQuotes #-} + +module Fig.Bus.Client (Commands(..), busClient) where + +import Fig.Prelude + +import System.Exit (exitFailure) + +import qualified Control.Concurrent as Conc + +import Data.ByteString (hPut, hGetLine) + +import Fig.Utils.Net +import Fig.Utils.SExpr + +data Commands m = Commands + { ping :: m () + , subscribe :: SExpr -> m () + , publish :: SExpr -> [SExpr] -> m () + } + +newtype FigBusClientException = FigBusClientException Text + deriving (Show, Eq, Ord) +instance Exception FigBusClientException + +busClient :: forall m. + (MonadIO m, MonadThrow m, MonadMask m) => + (Text, Text) -> + (Commands IO -> IO ()) -> + (Commands IO -> SExpr -> IO ()) -> + IO () -> + m () +busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO . void . Conc.forkIO $ onConn cmds + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) + where + catchFailure body = catch body \(e :: IOException) -> do + log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e + liftIO $ exitFailure + +_testClient :: IO () +_testClient = busClient ("localhost", "32050") + (\cmds -> do + cmds.subscribe [sexp|foo|] + forever do + Conc.threadDelay 1000000 + cmds.publish [sexp|bar|] [[sexp|42|]] + ) + (\_cmds d -> putStrLn $ "Received: " <> pretty d) + (pure ()) |
