summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/SExpr/Client.hs
diff options
context:
space:
mode:
Diffstat (limited to 'fig-bus/src/Fig/Bus/SExpr/Client.hs')
-rw-r--r--fig-bus/src/Fig/Bus/SExpr/Client.hs39
1 files changed, 21 insertions, 18 deletions
diff --git a/fig-bus/src/Fig/Bus/SExpr/Client.hs b/fig-bus/src/Fig/Bus/SExpr/Client.hs
index ccd41a7..bdb6a3e 100644
--- a/fig-bus/src/Fig/Bus/SExpr/Client.hs
+++ b/fig-bus/src/Fig/Bus/SExpr/Client.hs
@@ -8,6 +8,7 @@ import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
+import qualified Control.Concurrent.MVar as MVar
import Data.ByteString (hPut, hGetLine)
@@ -31,24 +32,26 @@ busClient :: forall m.
(Commands IO -> SExpr -> IO ()) ->
IO () ->
m ()
-busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
- let
- sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n"
- cmds = Commands
- { ping = sendSexpr [sexp|(ping)|]
- , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|]
- , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|]
- }
- in
- ( do
- liftIO $ Async.concurrently_ (onConn cmds) do
- forever do
- line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
- case parseSExpr line of
- Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
- Just x -> liftIO $ onData cmds x
- , liftIO onQuit
- )
+busClient loc@(host, port) onConn onData onQuit = do
+ lock <- liftIO $ MVar.newMVar ()
+ catchFailure . client loc $ pure \h ->
+ let
+ sendSexpr x = liftIO $ MVar.withMVar lock \_ -> hPut h . encodeUtf8 $ pretty x <> "\n"
+ cmds = Commands
+ { ping = sendSexpr [sexp|(ping)|]
+ , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|]
+ , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|]
+ }
+ in
+ ( do
+ liftIO $ Async.concurrently_ (onConn cmds) do
+ forever do
+ line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
+ case parseSExpr line of
+ Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
+ Just x -> liftIO $ onData cmds x
+ , liftIO onQuit
+ )
where
catchFailure body = catch body \(e :: IOException) -> do
log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e