From 7b101b8c3c0481d76733c77008b5b763ceb3b535 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Sun, 4 Aug 2024 02:09:37 -0400 Subject: Live monitor endpoint --- fig-frontend/fig-frontend.cabal | 1 + fig-frontend/src/Fig/Frontend.hs | 47 +++++++++++++++++++++++----- fig-frontend/src/Fig/Frontend/Utils.hs | 12 +++++++ fig-monitor-twitch/src/Fig/Monitor/Twitch.hs | 18 +++++------ 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/fig-frontend/fig-frontend.cabal b/fig-frontend/fig-frontend.cabal index f3c4f2e..9fc8f8e 100644 --- a/fig-frontend/fig-frontend.cabal +++ b/fig-frontend/fig-frontend.cabal @@ -42,6 +42,7 @@ common deps , wai , wai-extra , wai-middleware-static + , wai-websockets , warp , websockets , wuss diff --git a/fig-frontend/src/Fig/Frontend.hs b/fig-frontend/src/Fig/Frontend.hs index 51938dc..635df45 100644 --- a/fig-frontend/src/Fig/Frontend.hs +++ b/fig-frontend/src/Fig/Frontend.hs @@ -7,14 +7,18 @@ import Fig.Prelude import System.Random (randomRIO) import Control.Lens (use, (^?), Ixed (..)) +import qualified Control.Concurrent.Chan as Chan +import qualified Control.Concurrent.MVar as MVar import qualified Data.Text as Text import qualified Data.Text.Lazy as Text.L import qualified Data.ByteString.Base64 as BS.Base64 +import qualified Data.Set as Set import qualified Network.Wai as Wai import qualified Network.Wai.Middleware.Static as Wai.Static import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.WebSockets as WS import qualified Web.Scotty as Sc @@ -25,22 +29,43 @@ import Fig.Frontend.Auth import Fig.Frontend.State import qualified Fig.Frontend.DB as DB +data LiveEvent + = LiveEventOnline Text + | LiveEventOffline Text + deriving (Show, Eq, Ord) + server :: Config -> (Text, Text) -> IO () server cfg busAddr = do log $ "Frontend server running on port " <> tshow cfg.port + liveEvents <- Chan.newChan @LiveEvent + currentlyLive <- MVar.newMVar Set.empty busClient busAddr (\cmds -> do log "Connected to bus!" - Warp.run cfg.port =<< app cfg cmds + cmds.subscribe [sexp|(monitor twitch stream online)|] + cmds.subscribe [sexp|(monitor twitch stream offline)|] + Warp.run cfg.port =<< app cfg cmds liveEvents currentlyLive + ) + (\_cmds d -> do + case d of + SExprList [ev, SExprString user] + | ev == [sexp|(monitor twitch stream online)|] -> do + log $ "Stream online: " <> user + MVar.modifyMVar_ currentlyLive (pure . Set.insert user) + Chan.writeChan liveEvents $ LiveEventOnline user + | ev == [sexp|(monitor twitch stream offline)|] -> do + log $ "Stream offline: " <> user + MVar.modifyMVar_ currentlyLive (pure . Set.delete user) + Chan.writeChan liveEvents $ LiveEventOffline user + _ -> log $ "Invalid event: " <> tshow d ) - (\_ _ -> pure ()) (pure ()) sexprStr :: Text -> SExpr sexprStr = SExprString . BS.Base64.encodeBase64 . encodeUtf8 -app :: Config -> Commands IO -> IO Wai.Application -app cfg cmds = do +app :: Config -> Commands IO -> Chan.Chan LiveEvent -> MVar.MVar (Set.Set Text) -> IO Wai.Application +app cfg cmds liveEvents currentlyLive = do log "Connecting to database..." db <- DB.connect cfg log "Connected! Server active." @@ -56,10 +81,6 @@ app cfg cmds = do DB.get db "motd" >>= \case Nothing -> Sc.text "" Just val -> Sc.text . Text.L.fromStrict $ decodeUtf8 val - Sc.get "/api/motd" do - DB.get db "motd" >>= \case - Nothing -> Sc.text "" - Just val -> Sc.text . Text.L.fromStrict $ decodeUtf8 val Sc.get "/api/catchphrase" do let catchphrases = [ "vtuber (male)" @@ -127,5 +148,15 @@ app cfg cmds = do log . tshow $ "partial handshake from " <> me <> " to " <> target DB.sadd db ("pokeinbox:" <> target) [me] Sc.text "partial" + Sc.get "/api/circle" do + live <- liftIO $ MVar.readMVar currentlyLive + Sc.text . Text.L.fromStrict . pretty . SExprList @Void $ sexprStr <$> Set.toList live + websocket "/api/circle/events" \conn -> do + c <- Chan.dupChan liveEvents + forever do + ev <- liftIO $ Chan.readChan c + WS.sendTextData conn $ case ev of + LiveEventOnline u -> "online " <> u + LiveEventOffline u -> "offline " <> u Sc.notFound do Sc.text "not found" diff --git a/fig-frontend/src/Fig/Frontend/Utils.hs b/fig-frontend/src/Fig/Frontend/Utils.hs index 3081ddb..090c6ba 100644 --- a/fig-frontend/src/Fig/Frontend/Utils.hs +++ b/fig-frontend/src/Fig/Frontend/Utils.hs @@ -5,12 +5,17 @@ module Fig.Frontend.Utils ( FigFrontendException(..) , loadConfig , Config(..) + , websocket , module Network.HTTP.Types.Status ) where import Fig.Prelude import Network.HTTP.Types.Status +import qualified Network.Wai.Handler.WebSockets as Wai.WS +import qualified Network.WebSockets as WS + +import qualified Web.Scotty as Sc import qualified Toml @@ -39,3 +44,10 @@ loadConfig :: FilePath -> IO Config loadConfig path = Toml.decodeFileEither configCodec path >>= \case Left err -> throwM . FigFrontendException $ tshow err Right config -> pure config + +websocket :: ByteString -> (WS.Connection -> IO ()) -> Sc.ScottyM () +websocket pat h = Sc.middleware $ Wai.WS.websocketsOr WS.defaultConnectionOptions handler + where + handler pending = if WS.requestPath (WS.pendingRequest pending) == pat + then WS.acceptRequest pending >>= h + else WS.rejectRequest pending "" diff --git a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs index ef493c4..561c574 100644 --- a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs +++ b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs @@ -487,23 +487,21 @@ twitchChannelLiveMonitor cfg busAddr = do -- updateLive = fmap Map.fromList . runAuthed cfg $ forM cfg.monitor \user -> do -- liftIO . threadDelay $ 5 * 1000000 -- (user,) <$> userIsLive user - loop :: Map.Map Text Bool -> IO () - loop old = do + loop :: IO () + loop = do log "Updating liveness..." - new <- updateLive + live <- updateLive log "Update complete!" forM_ cfg.monitor \user -> - case (Map.lookup user old, Map.lookup user new) of - (Just False, Just True) -> do - log $ "Newly online: " <> user + case Map.lookup user live of + Just True -> do cmds.publish [sexp|(monitor twitch stream online)|] [SExprString user] - (Just True, Just False) -> do - log $ "Newly offline: " <> user + Just False -> do cmds.publish [sexp|(monitor twitch stream offline)|] [SExprString user] _ -> pure () threadDelay $ 5 * 60 * 1000000 - loop new - loop Map.empty + loop + loop ) (\_cmds _d -> pure ()) (pure ()) -- cgit v1.2.3