summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/Binary
diff options
context:
space:
mode:
Diffstat (limited to 'fig-bus/src/Fig/Bus/Binary')
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Client.hs47
1 files changed, 27 insertions, 20 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs
index e1226d8..39798cd 100644
--- a/fig-bus/src/Fig/Bus/Binary/Client.hs
+++ b/fig-bus/src/Fig/Bus/Binary/Client.hs
@@ -6,6 +6,7 @@ import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
+import qualified Control.Concurrent.MVar as MVar
import Data.ByteString (hPut)
@@ -28,26 +29,32 @@ busClient :: forall m.
(Commands IO -> ByteString -> ByteString -> IO ()) ->
IO () ->
m ()
-busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
- let
- cmds = Commands
- { subscribe = \ev -> do
- hPut h "s"
- writeLengthPrefixed h ev
- , publish = \ev d -> do
- hPut h "p"
- writeLengthPrefixed h ev
- writeLengthPrefixed h d
- }
- in
- ( do
- liftIO $ Async.concurrently_ (onConn cmds) do
- forever do
- (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
- (Just ev, Just d) -> liftIO $ onData cmds ev d
- _else -> throwM . FigBusClientException $ "Connection to bus closed"
- , liftIO onQuit
- )
+busClient loc@(host, port) onConn onData onQuit = do
+ lock <- liftIO $ MVar.newMVar ()
+ catchFailure . client loc $ pure \h ->
+ let
+ cmds = Commands
+ { subscribe = \ev -> do
+ () <- MVar.takeMVar lock
+ hPut h "s"
+ writeLengthPrefixed h ev
+ MVar.putMVar lock ()
+ , publish = \ev d -> do
+ () <- MVar.takeMVar lock
+ hPut h "p"
+ writeLengthPrefixed h ev
+ writeLengthPrefixed h d
+ MVar.putMVar lock ()
+ }
+ in
+ ( do
+ liftIO $ Async.concurrently_ (onConn cmds) do
+ forever do
+ (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
+ (Just ev, Just d) -> liftIO $ onData cmds ev d
+ _else -> throwM . FigBusClientException $ "Connection to bus closed"
+ , liftIO onQuit
+ )
where
catchFailure body = catch body \(e :: IOException) -> do
log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e