diff options
| author | LLLL Colonq <llll@colonq> | 2025-09-16 03:24:26 -0400 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2025-09-16 03:24:26 -0400 |
| commit | 664d91dcba96fed6a8cc39a26d81a159bab458ba (patch) | |
| tree | 330f43a1939409c89fe74efac6fa4d0c6f5e4190 | |
| parent | d642561b841822829fc18a9225f46c7bba21e312 (diff) | |
fig-bus: Fix race condition
| -rw-r--r-- | fig-bus/src/Fig/Bus/Binary/Client.hs | 16 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/SExpr/Client.hs | 39 |
2 files changed, 28 insertions, 27 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs index 39798cd..3ee9050 100644 --- a/fig-bus/src/Fig/Bus/Binary/Client.hs +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -35,16 +35,14 @@ busClient loc@(host, port) onConn onData onQuit = do let cmds = Commands { subscribe = \ev -> do - () <- MVar.takeMVar lock - hPut h "s" - writeLengthPrefixed h ev - MVar.putMVar lock () + MVar.withMVar lock \_ -> do + hPut h "s" + writeLengthPrefixed h ev , publish = \ev d -> do - () <- MVar.takeMVar lock - hPut h "p" - writeLengthPrefixed h ev - writeLengthPrefixed h d - MVar.putMVar lock () + MVar.withMVar lock \_ -> do + hPut h "p" + writeLengthPrefixed h ev + writeLengthPrefixed h d } in ( do diff --git a/fig-bus/src/Fig/Bus/SExpr/Client.hs b/fig-bus/src/Fig/Bus/SExpr/Client.hs index ccd41a7..bdb6a3e 100644 --- a/fig-bus/src/Fig/Bus/SExpr/Client.hs +++ b/fig-bus/src/Fig/Bus/SExpr/Client.hs @@ -8,6 +8,7 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.MVar as MVar import Data.ByteString (hPut, hGetLine) @@ -31,24 +32,26 @@ busClient :: forall m. (Commands IO -> SExpr -> IO ()) -> IO () -> m () -busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> - let - sendSexpr x = liftIO . hPut h . encodeUtf8 $ pretty x <> "\n" - cmds = Commands - { ping = sendSexpr [sexp|(ping)|] - , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] - , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] - } - in - ( do - liftIO $ Async.concurrently_ (onConn cmds) do - forever do - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x - , liftIO onQuit - ) +busClient loc@(host, port) onConn onData onQuit = do + lock <- liftIO $ MVar.newMVar () + catchFailure . client loc $ pure \h -> + let + sendSexpr x = liftIO $ MVar.withMVar lock \_ -> hPut h . encodeUtf8 $ pretty x <> "\n" + cmds = Commands + { ping = sendSexpr [sexp|(ping)|] + , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|] + , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|] + } + in + ( do + liftIO $ Async.concurrently_ (onConn cmds) do + forever do + line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) + case parseSExpr line of + Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line + Just x -> liftIO $ onData cmds x + , liftIO onQuit + ) where catchFailure body = catch body \(e :: IOException) -> do log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e |
