From 664d91dcba96fed6a8cc39a26d81a159bab458ba Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Tue, 16 Sep 2025 03:24:26 -0400 Subject: fig-bus: Fix race condition --- fig-bus/src/Fig/Bus/SExpr/Client.hs | 39 ++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) (limited to 'fig-bus/src/Fig/Bus/SExpr/Client.hs') diff --git a/fig-bus/src/Fig/Bus/SExpr/Client.hs b/fig-bus/src/Fig/Bus/SExpr/Client.hs index ccd41a7..bdb6a3e 100644 --- a/fig-bus/src/Fig/Bus/SExpr/Client.hs +++ b/fig-bus/src/Fig/Bus/SExpr/Client.hs @@ -8,6 +8,7 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.MVar as MVar import Data.ByteString (hPut, hGetLine) @@ -31,24 +32,26 @@ busClient :: forall m. (Commands IO -> SExpr -> IO ()) -> IO () -> m () -busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> - let - sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" - cmds = Commands - { ping = sendSexpr [sexp|(ping)|] - , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] - , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] - } - in - ( do - liftIO $ Async.concurrently_ (onConn cmds) do - forever do - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x - , liftIO onQuit - ) +busClient loc@(host, port) onConn onData onQuit = do + lock <- liftIO $ MVar.newMVar () + catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO $ MVar.withMVar lock \_ -> hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) where catchFailure body = catch body \(e :: IOException) -> do log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e -- cgit v1.2.3