blob: f800cf76f56441835b85f6f202d65a90d00ea4eb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
module Fig.Bus.Binary (main) where
import Fig.Prelude
import Control.Concurrent.MVar as MVar
import qualified Data.List as List
import Data.ByteString (hGet)
import qualified Data.ByteString as BS
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.IORef as IORef
import Fig.Utils.Net
import Fig.Bus.Binary.Utils
newtype BusState = BusState
{ subscriptions :: Map EventType [Handle]
}
subscribe :: EventType -> Handle -> BusState -> BusState
subscribe ev h bs = bs
{ subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions
}
unsubscribe :: EventType -> Handle -> BusState -> BusState
unsubscribe ev h bs = bs
{ subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
}
publish :: EventType -> ByteString -> BusState -> IO ()
publish ev d bs =
case Map.lookup ev bs.subscriptions of
Nothing -> pure ()
Just hs -> forM_ hs \h -> do
writeEvent h ev
writeLengthPrefixed h d
main :: (Maybe Text, Text) -> IO ()
main bind = do
st <- MVar.newMVar $ BusState { subscriptions = Map.empty }
server bind do
subs <- IORef.newIORef ([] :: [EventType])
pure \h peer ->
( do
log $ "Connected: " <> tshow peer
let
go = do
c <- hGet h 1
when (BS.length c == 1) do
case BS.head c of
115 -> readEvent h >>= \case
Just ev@(EventType e) -> do
log $ tshow peer <> " subscribing to: " <> tshow e
IORef.modifyIORef' subs (ev:)
MVar.modifyMVar_ st (pure . subscribe ev h)
go
_else -> log "Malformed subscription"
112 -> (,) <$> readEvent h <*> readLengthPrefixed h >>= \case
(Just ev@(EventType e), Just d) -> do
log $ tshow peer <> " publishing to: " <> tshow e
publish ev d =<< MVar.readMVar st
go
_else -> log "Malformed publish"
w -> log $ "Unknown command code: " <> tshow w
go
, do
log $ "Disconnected: " <> tshow peer
ss <- IORef.readIORef subs
MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss
)
|