summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/SExpr/Client.hs
blob: bdb6a3ecca741fd6b9ae4cff311c4eb2ea0b6b30 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
{-# Language QuasiQuotes #-}

module Fig.Bus.SExpr.Client (Commands(..), busClient) where

import Fig.Prelude

import System.Exit (exitFailure)

import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.MVar as MVar

import Data.ByteString (hPut, hGetLine)

import Fig.Utils.Net
import Fig.Utils.SExpr

data Commands m = Commands
  { ping :: m ()
  , subscribe :: SExpr -> m ()
  , publish :: SExpr -> [SExpr] -> m ()
  }

newtype FigBusClientException = FigBusClientException Text
  deriving (Show, Eq, Ord)
instance Exception FigBusClientException

busClient :: forall m.
  (MonadIO m, MonadThrow m, MonadMask m) =>
  (Text, Text) ->
  (Commands IO -> IO ()) ->
  (Commands IO -> SExpr -> IO ()) ->
  IO () ->
  m ()
busClient loc@(host, port) onConn onData onQuit = do
  lock <- liftIO $ MVar.newMVar ()
  catchFailure . client loc $ pure \h ->
    let
      sendSexpr x = liftIO $ MVar.withMVar lock \_ -> hPut h . encodeUtf8 $ pretty x <> "\n"
      cmds = Commands
        { ping = sendSexpr [sexp|(ping)|]
        , subscribe = \ev -> sendSexpr [sexp|(sub ,ev)|]
        , publish = \ev d -> sendSexpr [sexp|(pub ,ev ,@d)|]
        }
    in
      ( do
          liftIO $ Async.concurrently_ (onConn cmds) do
            forever do
              line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
              case parseSExpr line of
                Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
                Just x -> liftIO $ onData cmds x
      , liftIO onQuit
      )
  where
    catchFailure body = catch body \(e :: IOException) -> do
      log $ "Failed to connect to bus at " <> host <> ":" <> port <> ": " <> tshow e
      liftIO exitFailure

_testClient :: IO ()
_testClient = busClient ("localhost", "32050")
  (\cmds -> do
      cmds.subscribe [sexp|foo|]
      forever do
        Conc.threadDelay 1000000
        cmds.publish [sexp|bar|] [[sexp|42|]]
  )
  (\_cmds d -> log $ "Received: " <> pretty d)
  (pure ())