summaryrefslogtreecommitdiff
path: root/fig-bus/src/Fig/Bus/SExpr.hs
blob: 49cee90922f50d92b0c8d714ee25e614f8a45f34 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
module Fig.Bus.SExpr (main) where

import Fig.Prelude

import Control.Concurrent.MVar as MVar

import qualified Data.List as List
import Data.ByteString (hPut, hGetLine)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.IORef as IORef

import Fig.Utils.SExpr
import Fig.Utils.Net

newtype BusState = BusState
  { subscriptions :: Map SExpr [Handle]
  }

subscribe :: SExpr -> Handle -> BusState -> BusState
subscribe ev h bs = bs
  { subscriptions = Map.insertWith (<>) ev [h] bs.subscriptions
  }

unsubscribe :: SExpr -> Handle -> BusState -> BusState
unsubscribe ev h bs = bs
  { subscriptions = Map.update (Just . List.delete h) ev bs.subscriptions
  }

publish :: SExpr -> [SExpr] -> BusState -> IO ()
publish ev d bs =
  case Map.lookup ev bs.subscriptions of
    Nothing -> pure ()
    Just hs -> forM_ hs \h -> do
      hPut h . encodeUtf8 $ pretty (SExprList $ ev:d) <> "\n"

main :: (Maybe Text, Text) -> IO ()
main bind = do
  st <- MVar.newMVar $ BusState { subscriptions = Map.empty }
  server bind do
    subs <- IORef.newIORef ([] :: [SExpr])
    pure \h peer ->
      ( do
          forever do
            line <- throwLeft id . decodeUtf8' =<< hGetLine h
            case parseSExpr line of
              Just (SExprList (SExprSymbol "ping":_)) -> do
                log $ tshow peer <> " pinged"
                hPut h . encodeUtf8 $ "(pong)\n"
              Just (SExprList [SExprSymbol "sub", ev]) -> do
                log $ tshow peer <> " subscribing to: " <> pretty ev
                IORef.modifyIORef' subs (ev:)
                MVar.modifyMVar_ st (pure . subscribe ev h)
              Just (SExprList (SExprSymbol "pub":ev:d)) -> do
                log $ tshow peer <> " publishing " <> pretty (SExprList d) <> " to: " <> pretty ev
                publish ev d =<< MVar.readMVar st
              Just x -> log $ tshow peer <> " sent invalid command: " <> pretty x
              Nothing -> log $ tshow peer <> " sent malformed s-expression: " <> line
      , do
          ss <- IORef.readIORef subs
          MVar.modifyMVar_ st \bs -> pure $ foldr (`unsubscribe` h) bs ss
      )