blob: 2102864996deaa3eee102ce13d044d25e0df3b08 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
module Fig.Bus (main) where
import Fig.Prelude
import Control.Concurrent.MVar as MVar
import qualified Data.List as List
import Data.ByteString (hPut, hGetLine)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.IORef as IORef
import Fig.Utils.SExpr
import Fig.Utils.Net
newtype BusState = BusState
{ subscriptions :: Map SExpr [Handle]
}
subscribe :: SExpr -> Handle -> BusState -> BusState
subscribe ev h bs = bs
{ subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions
}
unsubscribe :: SExpr -> Handle -> BusState -> BusState
unsubscribe ev h bs = bs
{ subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
}
publish :: SExpr -> [SExpr] -> BusState -> IO ()
publish ev d bs =
case Map.lookup ev bs.subscriptions of
Nothing -> pure ()
Just hs -> forM_ hs \h -> do
hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n"
main :: (Maybe Text, Text) -> IO ()
main bind = do
st <- MVar.newMVar $ BusState { subscriptions = Map.empty }
server bind do
subs <- IORef.newIORef ([] :: [SExpr])
pure \h peer ->
( do
forever do
line <- throwLeft id . decodeUtf8' =<< hGetLine h
case parseSExpr line of
Just (SExprList (SExprSymbol "ping":_)) -> do
log $ tshow peer <> " pinged"
hPut h . encodeUtf8 $ "(pong)\n"
Just (SExprList [SExprSymbol "sub", ev]) -> do
log $ tshow peer <> " subscribing to: " <> pretty ev
IORef.modifyIORef' subs (ev:)
MVar.modifyMVar_ st (pure . subscribe ev h)
Just (SExprList (SExprSymbol "pub":ev:d)) -> do
log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev
publish ev d =<< MVar.readMVar st
Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x
Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line
, do
ss <- IORef.readIORef subs
MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss
)
|