summaryrefslogtreecommitdiff
path: root/fig-bus
diff options
context:
space:
mode:
Diffstat (limited to 'fig-bus')
-rw-r--r--fig-bus/fig-bus.cabal3
-rw-r--r--fig-bus/main/Main.hs23
-rw-r--r--fig-bus/src/Fig/Bus/Binary.hs55
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Client.hs30
-rw-r--r--fig-bus/src/Fig/Bus/Binary/Utils.hs44
5 files changed, 85 insertions, 70 deletions
diff --git a/fig-bus/fig-bus.cabal b/fig-bus/fig-bus.cabal
index d8caf99..bfb2c7f 100644
--- a/fig-bus/fig-bus.cabal
+++ b/fig-bus/fig-bus.cabal
@@ -33,8 +33,9 @@ library
hs-source-dirs: src
exposed-modules:
Fig.Bus.SExp
- Fig.Bus.Binary
Fig.Bus.SExp.Client
+ Fig.Bus.Binary
+ Fig.Bus.Binary.Utils
Fig.Bus.Binary.Client
executable fig-bus
diff --git a/fig-bus/main/Main.hs b/fig-bus/main/Main.hs
index dcafffd..bf5c170 100644
--- a/fig-bus/main/Main.hs
+++ b/fig-bus/main/Main.hs
@@ -4,18 +4,28 @@ import Fig.Prelude
import Options.Applicative
-import qualified Fig.Bus
-import qualified Fig.Bus.Binary
+import qualified Fig.Bus.SExp as SExp
+import qualified Fig.Bus.Binary as Binary
+
+data Command = SExp | Binary
+
+parseCommand :: Parser Command
+parseCommand = subparser $ mconcat
+ [ command "sexp" $ info (pure SExp) (progDesc "Launch the s-expression bus")
+ , command "binary" $ info (pure Binary) (progDesc "Launch the binary bus")
+ ]
data Opts = Opts
- { host :: Text
- , port :: Text
+ { host :: !Text
+ , port :: !Text
+ , cmd :: !Command
}
parseOpts :: Parser Opts
parseOpts = Opts
<$> strOption (long "host" <> metavar "HOST" <> help "Interface to bind" <> value "localhost")
<*> strOption (long "port" <> metavar "PORT" <> help "Port to bind" <> showDefault <> value "32050")
+ <*> parseCommand
main :: IO ()
main = do
@@ -23,5 +33,6 @@ main = do
( fullDesc
<> header "fig-bus - a pub/sub message bus"
)
- -- Fig.Bus.main (Just opts.host, opts.port)
- Fig.Bus.Binary.main (Just opts.host, opts.port)
+ case opts.cmd of
+ SExp -> SExp.main (Just opts.host, opts.port)
+ Binary -> Binary.main (Just opts.host, opts.port)
diff --git a/fig-bus/src/Fig/Bus/Binary.hs b/fig-bus/src/Fig/Bus/Binary.hs
index 6329d01..5f465ec 100644
--- a/fig-bus/src/Fig/Bus/Binary.hs
+++ b/fig-bus/src/Fig/Bus/Binary.hs
@@ -5,19 +5,15 @@ import Fig.Prelude
import Control.Monad (when)
import Control.Concurrent.MVar as MVar
-import Data.Word (Word8, Word32)
-import Data.Bits ((.|.), (.&.), shiftL, shiftR)
import qualified Data.List as List
-import Data.ByteString (hPut, hGet)
+import Data.ByteString (hGet)
import qualified Data.ByteString as BS
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.IORef as IORef
import Fig.Utils.Net
-
-newtype EventType = EventType ByteString
- deriving (Show, Eq, Ord)
+import Fig.Bus.Binary.Utils
newtype BusState = BusState
{ subscriptions :: Map EventType [Handle]
@@ -33,39 +29,6 @@ unsubscribe ev h bs = bs
{ subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
}
-intFromLEBytes :: [Word8] -> Int
-intFromLEBytes [] = 0
-intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x
-
-readLengthPrefixed :: Handle -> IO (Maybe ByteString)
-readLengthPrefixed h = do
- n <- hGet h 4
- log $ "parsing: " <> tshow n
- case intFromLEBytes (BS.unpack n) of
- 0 -> pure Nothing
- len -> do
- log $ "reading: " <> tshow len
- x <- hGet h len
- pure $ Just x
-
-readEvent :: Handle -> IO (Maybe EventType)
-readEvent h = do
- mb <- readLengthPrefixed h
- pure $ EventType <$> mb
-
-writeLengthPrefixed :: Handle -> ByteString -> IO ()
-writeLengthPrefixed h d = do
- let l :: Word32 = fromIntegral $ BS.length d
- let bytes =
- [ fromIntegral $ l .&. 0xff
- , fromIntegral $ shiftR l 8 .&. 0xff
- , fromIntegral $ shiftR l 16 .&. 0xff
- , fromIntegral $ shiftR l 24 .&. 0xff
- ]
- hPut h $ BS.pack bytes <> d
-
-writeEvent :: Handle -> EventType -> IO ()
-writeEvent h (EventType d) = writeLengthPrefixed h d
publish :: EventType -> ByteString -> BusState -> IO ()
publish ev d bs =
@@ -88,19 +51,19 @@ main bind = do
when (BS.length c == 1) do
case BS.head c of
115 -> readEvent h >>= \case
- Just ev -> do
- log $ tshow peer <> " subscribing to: " <> tshow ev
+ Just ev@(EventType e) -> do
+ log $ tshow peer <> " subscribing to: " <> tshow e
IORef.modifyIORef' subs (ev:)
MVar.modifyMVar_ st (pure . subscribe ev h)
go
- _ -> log "malformed subscription"
+ _else -> log "malformed subscription"
112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case
- (Just ev, Just d) -> do
- log $ tshow peer <> " publishing to: " <> tshow ev
+ (Just ev@(EventType e), Just d) -> do
+ log $ tshow peer <> " publishing to: " <> tshow e
publish ev d =<< MVar.readMVar st
go
- _ -> log "malformed publish"
- _ -> log "unknown"
+ _else -> log "malformed publish"
+ w -> log $ "unknown command code: " <> tshow w
go
, do
ss <- IORef.readIORef subs
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs
index a41f300..a2e601f 100644
--- a/fig-bus/src/Fig/Bus/Binary/Client.hs
+++ b/fig-bus/src/Fig/Bus/Binary/Client.hs
@@ -1,5 +1,3 @@
-{-# Language QuasiQuotes #-}
-
module Fig.Bus.Binary.Client (Commands(..), busClient) where
import Fig.Prelude
@@ -9,14 +7,14 @@ import System.Exit (exitFailure)
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
-import Data.ByteString (hPut, hGet)
+import Data.ByteString (hPut)
import Fig.Utils.Net
-import Fig.Bus.Binary
+import Fig.Bus.Binary.Utils
data Commands m = Commands
- { subscribe :: EventType -> m ()
- , publish :: EventType -> ByteString -> m ()
+ { subscribe :: !(ByteString -> m ())
+ , publish :: !(ByteString -> ByteString -> m ())
}
newtype FigBusClientException = FigBusClientException Text
@@ -27,16 +25,16 @@ busClient :: forall m.
(MonadIO m, MonadThrow m, MonadMask m) =>
(Text, Text) ->
(Commands IO -> IO ()) ->
- (Commands IO -> SExpr -> IO ()) ->
+ (Commands IO -> ByteString -> ByteString -> IO ()) ->
IO () ->
m ()
busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h ->
let
cmds = Commands
- { subscribe = \(EventType ev) -> do
+ { subscribe = \ev -> do
hPut h "s"
writeLengthPrefixed h ev
- , publish = \(EventType ev) d -> do
+ , publish = \ev d -> do
hPut h "p"
writeLengthPrefixed h ev
writeLengthPrefixed h d
@@ -45,11 +43,9 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu
( do
liftIO $ Async.concurrently_ (onConn cmds) do
forever do
-
- line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h)
- case parseSExpr line of
- Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line
- Just x -> liftIO $ onData cmds x
+ (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case
+ (Just ev, Just d) -> liftIO $ onData cmds ev d
+ _else -> throwM . FigBusClientException $ "Server sent malformed data"
, liftIO onQuit
)
where
@@ -60,10 +56,10 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu
_testClient :: IO ()
_testClient = busClient ("localhost", "32050")
(\cmds -> do
- cmds.subscribe [sexp|foo|]
+ cmds.subscribe "foo"
forever do
Conc.threadDelay 1000000
- cmds.publish [sexp|bar|] [[sexp|42|]]
+ cmds.publish "bar" "42"
)
- (\_cmds d -> log $ "Received: " <> pretty d)
+ (\_cmds ev d -> log $ "Received: " <> tshow (ev, d))
(pure ())
diff --git a/fig-bus/src/Fig/Bus/Binary/Utils.hs b/fig-bus/src/Fig/Bus/Binary/Utils.hs
new file mode 100644
index 0000000..734c7c9
--- /dev/null
+++ b/fig-bus/src/Fig/Bus/Binary/Utils.hs
@@ -0,0 +1,44 @@
+module Fig.Bus.Binary.Utils where
+
+import Fig.Prelude
+
+import Data.Word (Word8, Word32)
+import Data.Bits ((.|.), (.&.), shiftL, shiftR)
+import Data.ByteString (hPut, hGet)
+import qualified Data.ByteString as BS
+
+newtype EventType = EventType ByteString
+ deriving (Show, Eq, Ord)
+
+intFromLEBytes :: [Word8] -> Int
+intFromLEBytes [] = 0
+intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x
+
+readLengthPrefixed :: Handle -> IO (Maybe ByteString)
+readLengthPrefixed h = do
+ n <- hGet h 4
+ case intFromLEBytes (BS.unpack n) of
+ 0 -> pure Nothing
+ len -> do
+ x <- hGet h len
+ pure $ Just x
+
+
+readEvent :: Handle -> IO (Maybe EventType)
+readEvent h = do
+ mb <- readLengthPrefixed h
+ pure $ EventType <$> mb
+
+writeLengthPrefixed :: Handle -> ByteString -> IO ()
+writeLengthPrefixed h d = do
+ let l :: Word32 = fromIntegral $ BS.length d
+ let bytes =
+ [ fromIntegral $ l .&. 0xff
+ , fromIntegral $ shiftR l 8 .&. 0xff
+ , fromIntegral $ shiftR l 16 .&. 0xff
+ , fromIntegral $ shiftR l 24 .&. 0xff
+ ]
+ hPut h $ BS.pack bytes <> d
+
+writeEvent :: Handle -> EventType -> IO ()
+writeEvent h (EventType d) = writeLengthPrefixed h d