summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/Binary/Client.hs
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2025-09-16 03:08:39 -0400
committerLLLL Colonq <llll@colonq>2025-09-16 03:08:39 -0400
commitd642561b841822829fc18a9225f46c7bba21e312 (patch)
treeff6464d758bc4eb1d54132ca41fc73c81dfc7ae2 /fig-bus/src/Fig/Bus/Binary/Client.hs
parenta421eb9bdddfa7e2765456f756833d8941ac7a08 (diff)
Fix bus race condition
Diffstat (limited to 'fig-bus/src/Fig/Bus/Binary/Client.hs')
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Client.hs47
1 files changed, 27 insertions, 20 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs
index e1226d8..39798cd 100644
--- a/fig-bus/src/Fig/Bus/Binary/Client.hs
+++ b/fig-bus/src/Fig/Bus/Binary/Client.hs
@@ -6,6 +6,7 @@ import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
+import qualified Control.Concurrent.MVar as MVar
import Data.ByteString (hPut)
@@ -28,26 +29,32 @@ busClient :: forall m.
(Commands IO -> ByteString -> ByteString -> IO ()) ->
IO () ->
m ()
-busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
- let
- cmds = Commands
- { subscribe = \ev -> do
- hPut h "s"
- writeLengthPrefixed h ev
- , publish = \ev d -> do
- hPut h "p"
- writeLengthPrefixed h ev
- writeLengthPrefixed h d
- }
- in
- ( do
- liftIO $ Async.concurrently_ (onConn cmds) do
- forever do
- (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
- (Just ev, Just d) -> liftIO $ onData cmds ev d
- _else -> throwM . FigBusClientException $ "Connection to bus closed"
- , liftIO onQuit
- )
+busClient loc@(host, port) onConn onData onQuit = do
+ lock <- liftIO $ MVar.newMVar ()
+ catchFailure . client loc $ pure \h ->
+ let
+ cmds = Commands
+ { subscribe = \ev -> do
+ () <- MVar.takeMVar lock
+ hPut h "s"
+ writeLengthPrefixed h ev
+ MVar.putMVar lock ()
+ , publish = \ev d -> do
+ () <- MVar.takeMVar lock
+ hPut h "p"
+ writeLengthPrefixed h ev
+ writeLengthPrefixed h d
+ MVar.putMVar lock ()
+ }
+ in
+ ( do
+ liftIO $ Async.concurrently_ (onConn cmds) do
+ forever do
+ (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
+ (Just ev, Just d) -> liftIO $ onData cmds ev d
+ _else -> throwM . FigBusClientException $ "Connection to bus closed"
+ , liftIO onQuit
+ )
where
catchFailure body = catch body \(e :: IOException) -> do
log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e