diff options
Diffstat (limited to 'fig-bus/src/Fig/Bus/Binary')
| -rw-r--r-- | fig-bus/src/Fig/Bus/Binary/Client.hs | 30 | ||||
| -rw-r--r-- | fig-bus/src/Fig/Bus/Binary/Utils.hs | 44 |
2 files changed, 57 insertions, 17 deletions
diff --git a/fig-bus/src/Fig/Bus/Binary/Client.hs b/fig-bus/src/Fig/Bus/Binary/Client.hs index a41f300..a2e601f 100644 --- a/fig-bus/src/Fig/Bus/Binary/Client.hs +++ b/fig-bus/src/Fig/Bus/Binary/Client.hs @@ -1,5 +1,3 @@ -{-# Language QuasiQuotes #-} - module Fig.Bus.Binary.Client (Commands(..), busClient) where import Fig.Prelude @@ -9,14 +7,14 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as Conc import qualified Control.Concurrent.Async as Async -import Data.ByteString (hPut, hGet) +import Data.ByteString (hPut) import Fig.Utils.Net -import Fig.Bus.Binary +import Fig.Bus.Binary.Utils data Commands m = Commands - { subscribe :: EventType -> m () - , publish :: EventType -> ByteString -> m () + { subscribe :: !(ByteString -> m ()) + , publish :: !(ByteString -> ByteString -> m ()) } newtype FigBusClientException = FigBusClientException Text @@ -27,16 +25,16 @@ busClient :: forall m. (MonadIO m, MonadThrow m, MonadMask m) => (Text, Text) -> (Commands IO -> IO ()) -> - (Commands IO -> SExpr -> IO ()) -> + (Commands IO -> ByteString -> ByteString -> IO ()) -> IO () -> m () busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pure \h -> let cmds = Commands - { subscribe = \(EventType ev) -> do + { subscribe = \ev -> do hPut h "s" writeLengthPrefixed h ev - , publish = \(EventType ev) d -> do + , publish = \ev d -> do hPut h "p" writeLengthPrefixed h ev writeLengthPrefixed h d @@ -45,11 +43,9 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu ( do liftIO $ Async.concurrently_ (onConn cmds) do forever do - - line <- throwLeft id . decodeUtf8' =<< liftIO (hGetLine h) - case parseSExpr line of - Nothing -> throwM . FigBusClientException $ "Server sent malformed s-expression: " <> line - Just x -> liftIO $ onData cmds x + (,) <$> readLengthPrefixed h <*> readLengthPrefixed h >>= \case + (Just ev, Just d) -> liftIO $ onData cmds ev d + _else -> throwM . FigBusClientException $ "Server sent malformed data" , liftIO onQuit ) where @@ -60,10 +56,10 @@ busClient loc@(host, port) onConn onData onQuit = catchFailure . client loc $ pu _testClient :: IO () _testClient = busClient ("localhost", "32050") (\cmds -> do - cmds.subscribe [sexp|foo|] + cmds.subscribe "foo" forever do Conc.threadDelay 1000000 - cmds.publish [sexp|bar|] [[sexp|42|]] + cmds.publish "bar" "42" ) - (\_cmds d -> log $ "Received: " <> pretty d) + (\_cmds ev d -> log $ "Received: " <> tshow (ev, d)) (pure ()) diff --git a/fig-bus/src/Fig/Bus/Binary/Utils.hs b/fig-bus/src/Fig/Bus/Binary/Utils.hs new file mode 100644 index 0000000..734c7c9 --- /dev/null +++ b/fig-bus/src/Fig/Bus/Binary/Utils.hs @@ -0,0 +1,44 @@ +module Fig.Bus.Binary.Utils where + +import Fig.Prelude + +import Data.Word (Word8, Word32) +import Data.Bits ((.|.), (.&.), shiftL, shiftR) +import Data.ByteString (hPut, hGet) +import qualified Data.ByteString as BS + +newtype EventType = EventType ByteString + deriving (Show, Eq, Ord) + +intFromLEBytes :: [Word8] -> Int +intFromLEBytes [] = 0 +intFromLEBytes (x:xs) = shiftL (intFromLEBytes xs) 8 .|. fromIntegral x + +readLengthPrefixed :: Handle -> IO (Maybe ByteString) +readLengthPrefixed h = do + n <- hGet h 4 + case intFromLEBytes (BS.unpack n) of + 0 -> pure Nothing + len -> do + x <- hGet h len + pure $ Just x + + +readEvent :: Handle -> IO (Maybe EventType) +readEvent h = do + mb <- readLengthPrefixed h + pure $ EventType <$> mb + +writeLengthPrefixed :: Handle -> ByteString -> IO () +writeLengthPrefixed h d = do + let l :: Word32 = fromIntegral $ BS.length d + let bytes = + [ fromIntegral $ l .&. 0xff + , fromIntegral $ shiftR l 8 .&. 0xff + , fromIntegral $ shiftR l 16 .&. 0xff + , fromIntegral $ shiftR l 24 .&. 0xff + ] + hPut h $ BS.pack bytes <> d + +writeEvent :: Handle -> EventType -> IO () +writeEvent h (EventType d) = writeLengthPrefixed h d |
