1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
module Fig.Bus.Binary.Client (Commands(..), busClient) where
import Fig.Prelude
import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.MVar as MVar
import Data.ByteString (hPut)
import Fig.Utils.Net
import Fig.Bus.Binary.Utils
data Commands m = Commands
{ subscribe :: !(ByteString -> m ())
, publish :: !(ByteString -> ByteString -> m ())
}
newtype FigBusClientException = FigBusClientException Text
deriving (Show, Eq, Ord)
instance Exception FigBusClientException
busClient :: forall m.
(MonadIO m, MonadThrow m, MonadMask m) =>
(Text, Text) ->
(Commands IO -> IO ()) ->
(Commands IO -> ByteString -> ByteString -> IO ()) ->
IO () ->
m ()
busClient loc@(host, port) onConn onData onQuit = do
lock <- liftIO $ MVar.newMVar ()
catchFailure . client loc $ pure \h ->
let
cmds = Commands
{ subscribe = \ev -> do
() <- MVar.takeMVar lock
hPut h "s"
writeLengthPrefixed h ev
MVar.putMVar lock ()
, publish = \ev d -> do
() <- MVar.takeMVar lock
hPut h "p"
writeLengthPrefixed h ev
writeLengthPrefixed h d
MVar.putMVar lock ()
}
in
( do
liftIO $ Async.concurrently_ (onConn cmds) do
forever do
(,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
(Just ev, Just d) -> liftIO $ onData cmds ev d
_else -> throwM . FigBusClientException $ "Connection to bus closed"
, liftIO onQuit
)
where
catchFailure body = catch body \(e :: IOException) -> do
log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e
liftIO exitFailure
_testClient :: IO ()
_testClient = busClient ("localhost", "32050")
(\cmds -> do
cmds.subscribe "foo"
forever do
Conc.threadDelay 1000000
cmds.publish "bar" "42"
)
(\_cmds ev d -> log $ "Received: " <> tshow (ev, d))
(pure ())
|