From d642561b841822829fc18a9225f46c7bba21e312 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Tue, 16 Sep 2025 03:08:39 -0400 Subject: Fix bus race condition --- fig-bus/src/Fig/Bus/Binary/Client.hs | 47 +++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs index e1226d8..39798cd 100644 --- a/fig-bus/src/Fig/Bus/Binary/Client.hs +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -6,6 +6,7 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.MVar as MVar import Data.ByteString (hPut) @@ -28,26 +29,32 @@ busClient :: forall m. (Commands IO -> ByteString -> ByteString -> IO ()) -> IO () -> m () -busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> - let - cmds = Commands - { subscribe = \ev -> do - hPut h "s" - writeLengthPrefixed h ev - , publish = \ev d -> do - hPut h "p" - writeLengthPrefixed h ev - writeLengthPrefixed h d - } - in - ( do - liftIO $ Async.concurrently_ (onConn cmds) do - forever do - (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case - (Just ev, Just d) -> liftIO $ onData cmds ev d - _else -> throwM . FigBusClientException $ "Connection to bus closed" - , liftIO onQuit - ) +busClient loc@(host, port) onConn onData onQuit = do + lock <- liftIO $ MVar.newMVar () + catchFailure . client loc $ pure \h -> + let + cmds = Commands + { subscribe = \ev -> do + () <- MVar.takeMVar lock + hPut h "s" + writeLengthPrefixed h ev + MVar.putMVar lock () + , publish = \ev d -> do + () <- MVar.takeMVar lock + hPut h "p" + writeLengthPrefixed h ev + writeLengthPrefixed h d + MVar.putMVar lock () + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> liftIO $ onData cmds ev d + _else -> throwM . FigBusClientException $ "Connection to bus closed" + , liftIO onQuit + ) where catchFailure body = catch body \(e :: IOException) -> do log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e -- cgit v1.2.3