diff options
| -rw-r--r-- | fig-monitor-twitch/main/Main.hs | 10 | ||||
| -rw-r--r-- | fig-monitor-twitch/src/Fig/Monitor/Twitch.hs | 172 | ||||
| -rw-r--r-- | fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs | 24 | ||||
| -rw-r--r-- | flake.nix | 46 |
4 files changed, 195 insertions, 57 deletions
diff --git a/fig-monitor-twitch/main/Main.hs b/fig-monitor-twitch/main/Main.hs index 05f3cdb..b306586 100644 --- a/fig-monitor-twitch/main/Main.hs +++ b/fig-monitor-twitch/main/Main.hs @@ -10,14 +10,17 @@ import Fig.Monitor.Twitch.Utils data Command = Monitor | Chatbot - | RedirectServer + | LiveChecker + | RedirectServer Bool | Validate parseCommand :: Parser Command parseCommand = subparser $ mconcat [ command "monitor" $ info (pure Monitor) (progDesc "Launch the Twitch monitor") , command "chatbot" $ info (pure Chatbot) (progDesc "Launch the Twitch chatbot") - , command "user-token-server" $ info (pure RedirectServer) (progDesc "Launch a web server to handle authentication redirects") + , command "live-checker" $ info (pure LiveChecker) (progDesc "Launch the Twitch live status checker") + , command "user-token-server" $ info (pure $ RedirectServer True) (progDesc "Launch a web server to handle authentication redirects") + , command "user-token-server-read-only" $ info (pure $ RedirectServer False) (progDesc "Launch a web server to handle authentication redirects") , command "validate-endpoint" $ info (pure Validate) (progDesc "Test Twitch authentication") ] data Opts = Opts @@ -44,5 +47,6 @@ main = do case opts.command of Monitor -> twitchEventClient cfg (opts.busHost, opts.busPort) Chatbot -> twitchChatClient cfg (opts.busHost, opts.busPort) - RedirectServer -> userTokenRedirectServer cfg + LiveChecker -> twitchChannelLiveMonitor cfg (opts.busHost, opts.busPort) + RedirectServer rw -> userTokenRedirectServer cfg rw Validate -> twitchEndpointTest cfg diff --git a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs index 17f2b8a..ef493c4 100644 --- a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs +++ b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs @@ -5,6 +5,7 @@ module Fig.Monitor.Twitch ( twitchEventClient , twitchChatClient + , twitchChannelLiveMonitor , twitchEndpointTest , userTokenRedirectServer ) where @@ -12,6 +13,7 @@ module Fig.Monitor.Twitch import Fig.Prelude import Control.Monad (unless) +import Control.Concurrent (threadDelay) import qualified Data.Maybe as Maybe import qualified Data.Text as Text @@ -57,6 +59,35 @@ loginToUserId login = do _ -> mempty maybe (throwM $ FigMonitorTwitchException "Failed to extract user ID") pure mid +usersAreLive :: [Text] -> Authed (Map.Map Text Bool) +usersAreLive users = do + log $ "Checking liveness for: " <> Text.intercalate " " users + res <- authedRequestJSON + "GET" + ( mconcat + [ "https://api.twitch.tv/helix/streams?type=live" + , mconcat $ ("&user_login="<>) <$> users + ] + ) + () + let mos = flip Aeson.parseMaybe res \obj -> do + obj .: "data" >>= \case + Aeson.Array os -> catMaybes . toList <$> forM os \case + Aeson.Object o -> Just <$> o .: "user_login" + _ -> pure Nothing + _ -> mempty + case mos of + Nothing -> throwM $ FigMonitorTwitchException "Failed to check liveness" + Just os -> Map.fromList <$> forM users \u -> do + let l = u `elem` os + log $ mconcat + [ u + , " is " + , if l then "" else "not " + , "live" + ] + pure (u, l) + subscribe :: Text -> Text -> Text -> Authed () subscribe sessionId event user = do log $ "Subscribing to " <> event <> " events for user ID: " <> user @@ -446,6 +477,37 @@ twitchEventClient cfg busAddr = do ) (pure ()) +twitchChannelLiveMonitor :: Config -> (Text, Text) -> IO () +twitchChannelLiveMonitor cfg busAddr = do + busClient busAddr + (\cmds -> do + let + updateLive :: IO (Map.Map Text Bool) + updateLive = runAuthed cfg $ usersAreLive cfg.monitor + -- updateLive = fmap Map.fromList . runAuthed cfg $ forM cfg.monitor \user -> do + -- liftIO . threadDelay $ 5 * 1000000 + -- (user,) <$> userIsLive user + loop :: Map.Map Text Bool -> IO () + loop old = do + log "Updating liveness..." + new <- updateLive + log "Update complete!" + forM_ cfg.monitor \user -> + case (Map.lookup user old, Map.lookup user new) of + (Just False, Just True) -> do + log $ "Newly online: " <> user + cmds.publish [sexp|(monitor twitch stream online)|] [SExprString user] + (Just True, Just False) -> do + log $ "Newly offline: " <> user + cmds.publish [sexp|(monitor twitch stream offline)|] [SExprString user] + _ -> pure () + threadDelay $ 5 * 60 * 1000000 + loop new + loop Map.empty + ) + (\_cmds _d -> pure ()) + (pure ()) + data IRCMessage = IRCMessage { tags :: Map.Map Text Text , prefix :: Maybe Text @@ -481,58 +543,60 @@ parseIRCMessage (Text.strip -> fullrest) = twitchChatClient :: Config -> (Text, Text) -> IO () twitchChatClient cfg busAddr = do log "Starting chatbot" - WS.runSecureClient "irc-ws.chat.twitch.tv" 443 "/" \conn -> do - WS.sendTextData conn $ "PASS oauth:" <> cfg.userToken - WS.sendTextData conn ("NICK lcolonq" :: Text) - WS.sendTextData conn ("CAP REQ :twitch.tv/commands twitch.tv/tags" :: Text) - WS.sendTextData conn $ "JOIN #" <> cfg.monitorChat - -- WS.sendTextData conn ("PRIVMSG #lcolonq :test the other direction" :: Text) - busClient busAddr - (\cmds -> do - cmds.subscribe [sexp|(monitor twitch chat outgoing)|] - forever do - resp <- WS.receiveData conn - forM (Text.lines resp) $ \line -> do - let msg = parseIRCMessage line - case msg.command of - "PING" -> do - log "Received PING, sending PONG" - WS.sendTextData conn $ "PONG :" <> mconcat msg.params - "CLEARCHAT" -> do - log "Received CLEARCHAT" - cmds.publish [sexp|(monitor twitch chat clear-chat)|] $ SExprString <$> msg.params - "NOTICE" -> do - log "Received NOTICE" - cmds.publish [sexp|(monitor twitch chat notice)|] $ SExprString <$> msg.params - "USERNOTICE" -> do - log "Received USERNOTICE" - cmds.publish [sexp|(monitor twitch chat user-notice)|] $ SExprString <$> msg.params - "PRIVMSG" - | Just displaynm <- Map.lookup "display-name" msg.tags - , Nothing <- Map.lookup "custom-reward-id" msg.tags -> do - cmds.publish [sexp|(monitor twitch chat incoming)|] - [ SExprString . BS.Base64.encodeBase64 $ encodeUtf8 displaynm - , SExprList $ (\(key, v) -> SExprList [SExprString key, SExprString v]) <$> Map.toList msg.tags - , SExprString . BS.Base64.encodeBase64 . encodeUtf8 . Text.unwords $ drop 1 msg.params - ] - _ -> pure () - ) - (\_cmds d -> do - case d of - SExprList [ev, SExprString msg] | ev == [sexp|(monitor twitch chat outgoing)|] -> do - log $ "Sending: " <> msg - WS.sendTextData conn $ mconcat - [ "PRIVMSG #" - , cfg.monitorChat - , " :" - , msg - ] - _ -> log $ "Invalid outgoing message: " <> tshow d - ) - (pure ()) + case headMay cfg.monitor of + Nothing -> pure () + Just chan -> WS.runSecureClient "irc-ws.chat.twitch.tv" 443 "/" \conn -> do + WS.sendTextData conn $ "PASS oauth:" <> cfg.userToken + WS.sendTextData conn ("NICK lcolonq" :: Text) + WS.sendTextData conn ("CAP REQ :twitch.tv/commands twitch.tv/tags" :: Text) + WS.sendTextData conn $ "JOIN #" <> chan + -- WS.sendTextData conn ("PRIVMSG #lcolonq :test the other direction" :: Text) + busClient busAddr + (\cmds -> do + cmds.subscribe [sexp|(monitor twitch chat outgoing)|] + forever do + resp <- WS.receiveData conn + forM (Text.lines resp) $ \line -> do + let msg = parseIRCMessage line + case msg.command of + "PING" -> do + log "Received PING, sending PONG" + WS.sendTextData conn $ "PONG :" <> mconcat msg.params + "CLEARCHAT" -> do + log "Received CLEARCHAT" + cmds.publish [sexp|(monitor twitch chat clear-chat)|] $ SExprString <$> msg.params + "NOTICE" -> do + log "Received NOTICE" + cmds.publish [sexp|(monitor twitch chat notice)|] $ SExprString <$> msg.params + "USERNOTICE" -> do + log "Received USERNOTICE" + cmds.publish [sexp|(monitor twitch chat user-notice)|] $ SExprString <$> msg.params + "PRIVMSG" + | Just displaynm <- Map.lookup "display-name" msg.tags + , Nothing <- Map.lookup "custom-reward-id" msg.tags -> do + cmds.publish [sexp|(monitor twitch chat incoming)|] + [ SExprString . BS.Base64.encodeBase64 $ encodeUtf8 displaynm + , SExprList $ (\(key, v) -> SExprList [SExprString key, SExprString v]) <$> Map.toList msg.tags + , SExprString . BS.Base64.encodeBase64 . encodeUtf8 . Text.unwords $ drop 1 msg.params + ] + _ -> pure () + ) + (\_cmds d -> do + case d of + SExprList [ev, SExprString msg] | ev == [sexp|(monitor twitch chat outgoing)|] -> do + log $ "Sending: " <> msg + WS.sendTextData conn $ mconcat + [ "PRIVMSG #" + , chan + , " :" + , msg + ] + _ -> log $ "Invalid outgoing message: " <> tshow d + ) + (pure ()) -userTokenRedirectServer :: Config -> IO () -userTokenRedirectServer cfg = do +userTokenRedirectServer :: Config -> Bool -> IO () +userTokenRedirectServer cfg rw = do log "Starting token redirect server on port 4444" Scotty.scottyOpts opts do Scotty.get "/" do @@ -548,7 +612,8 @@ userTokenRedirectServer cfg = do { Scotty.verbose = 0 , Scotty.settings = setPort 4444 (Scotty.settings def) } - scopes = + scopes = if rw then scopesReadWrite else scopesReadOnly + scopesReadWrite = [ "channel:manage:polls" , "channel:manage:predictions" , "channel:manage:redemptions" @@ -566,3 +631,6 @@ userTokenRedirectServer cfg = do , "chat:read" , "bits:read" ] + scopesReadOnly = + [ + ] diff --git a/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs b/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs index 59ba04c..b21a976 100644 --- a/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs +++ b/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs @@ -10,13 +10,16 @@ module Fig.Monitor.Twitch.Utils , authedRequestJSON , Authed , runAuthed + , userIsLiveScrape ) where import Fig.Prelude import Control.Monad.Reader (ReaderT, runReaderT) +import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BS.Lazy +import qualified Data.Map.Strict as Map import qualified Toml @@ -33,7 +36,7 @@ data Config = Config { clientId :: Text , userToken :: Text , userLogin :: Text - , monitorChat :: Text + , monitor :: [Text] } deriving (Show, Eq, Ord) configCodec :: Toml.TomlCodec Config @@ -42,7 +45,7 @@ configCodec = do userToken <- Toml.text "user_token" Toml..= (\a -> a.userToken) -- userIds <- Toml.arrayOf Toml._Text "user_ids" Toml..= (\a -> a.userIds) userLogin <- Toml.text "user_login" Toml..= (\a -> a.userLogin) - monitorChat <- Toml.text "monitor_chat" Toml..= (\a -> a.monitorChat) + monitor <- Toml.arrayOf Toml._Text "monitor" Toml..= (\a -> a.monitor) pure $ Config{..} loadConfig :: FilePath -> IO Config @@ -86,3 +89,20 @@ runAuthed :: Config -> Authed a -> IO a runAuthed config body = do manager <- HTTP.newManager HTTP.tlsManagerSettings runReaderT body.unAuthed RequestConfig{..} + +userIsLiveScrape :: Text -> Authed Bool +userIsLiveScrape user = do + rc <- ask + request <- liftIO . HTTP.parseRequest $ mconcat + [ "https://twitch.tv/" + , unpack user + ] + response <- liftIO $ HTTP.httpLbs request rc.manager + let res = BS.isInfixOf "\"isLiveBroadcast\":true" . BS.Lazy.toStrict $ HTTP.responseBody response + log $ mconcat + [ user + , " is " + , if res then "" else "not " + , "live" + ] + pure res @@ -93,6 +93,51 @@ }; }; }; + figMonitorTwitchLiveWatcherModule = { config, lib, ... }: + let + cfg = config.colonq.services.fig-monitor-twitch-live-watcher; + in { + options.colonq.services.fig-monitor-twitch-live-watcher = { + enable = lib.mkEnableOption "Enable the fig Twitch live watcher"; + busHost = lib.mkOption { + type = lib.types.str; + default = "127.0.0.1"; + description = "Message bus port"; + }; + busPort = lib.mkOption { + type = lib.types.port; + default = 32050; + description = "Address of message bus"; + }; + configFile = lib.mkOption { + type = lib.types.path; + description = "Path to config file"; + default = pkgs.writeText "fig-monitor-twitch.toml" '' + client_id = "" + user_token = "" + user_login = "" + monitor = [] + ''; + }; + }; + config = lib.mkIf cfg.enable { + systemd.services."colonq.fig-monitor-twitch-live-watcher" = { + wantedBy = ["multi-user.target"]; + after = ["colonq.fig-bus.service"]; + serviceConfig = { + Restart = "on-failure"; + ExecStart = "${haskellPackages.fig-monitor-twitch-live-watcher}/bin/fig-monitor-twitch live-checker --bus-host ${cfg.busHost} --bus-port ${toString cfg.busPort} --config ${cfg.configFile}"; + DynamicUser = "yes"; + RuntimeDirectory = "colonq.fig-monitor-twitch-live-watcher"; + RuntimeDirectoryMode = "0755"; + StateDirectory = "colonq.fig-monitor-twitch-live-watcher"; + StateDirectoryMode = "0700"; + CacheDirectory = "colonq.fig-monitor-twitch-live-watcher"; + CacheDirectoryMode = "0750"; + }; + }; + }; + }; figMonitorDiscordModule = { config, lib, ... }: let cfg = config.colonq.services.fig-monitor-discord; @@ -293,6 +338,7 @@ ]; withHoogle = true; buildInputs = [ + haskellPackages.cabal-install haskellPackages.haskell-language-server pkgs.nodejs (purescript.command {}) |
