summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/Binary/Client.hs
blob: 3ee9050d2f0f0e8d24bb06d983bf1049a960a83b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
module Fig.Bus.Binary.Client (Commands(..), busClient) where

import Fig.Prelude

import System.Exit (exitFailure)

import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.MVar as MVar

import Data.ByteString (hPut)

import Fig.Utils.Net
import Fig.Bus.Binary.Utils

data Commands m = Commands
  { subscribe :: !(ByteString -> m ())
  , publish :: !(ByteString -> ByteString -> m ())
  }

newtype FigBusClientException = FigBusClientException Text
  deriving (Show, Eq, Ord)
instance Exception FigBusClientException

busClient :: forall m.
  (MonadIO m, MonadThrow m, MonadMask m) =>
  (Text, Text) ->
  (Commands IO -> IO ()) ->
  (Commands IO -> ByteString -> ByteString -> IO ()) ->
  IO () ->
  m ()
busClient loc@(host, port) onConn onData onQuit = do
  lock <- liftIO $ MVar.newMVar ()
  catchFailure . client loc $ pure \h ->
    let
      cmds = Commands
        { subscribe = \ev -> do
            MVar.withMVar lock \_ -> do
              hPut h "s"
              writeLengthPrefixed h ev
        , publish = \ev d -> do
            MVar.withMVar lock \_ -> do
              hPut h "p"
              writeLengthPrefixed h ev
              writeLengthPrefixed h d
        }
    in
      ( do
          liftIO $ Async.concurrently_ (onConn cmds) do
            forever do
              (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
                (Just ev, Just d) -> liftIO $ onData cmds ev d
                _else -> throwM . FigBusClientException $ "Connection to bus closed"
      , liftIO onQuit
      )
  where
    catchFailure body = catch body \(e :: IOException) -> do
      log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e
      liftIO exitFailure

_testClient :: IO ()
_testClient = busClient ("localhost", "32050")
  (\cmds -> do
      cmds.subscribe "foo"
      forever do
        Conc.threadDelay 1000000
        cmds.publish "bar" "42"
  )
  (\_cmds ev d -> log $ "Received: " <> tshow (ev, d))
  (pure ())