summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2024-08-01 19:13:16 -0400
committerLLLL Colonq <llll@colonq>2024-08-01 19:13:16 -0400
commit3a795b76a703a048529313ea45e323da82f66d34 (patch)
tree7ca774841bd8780e99b0914fcd6cac09b1a6d9eb
parent7ffb7b021eec46f2d714e04b47d752012e1bf8ea (diff)
Add Twitch live monitor
-rw-r--r--fig-monitor-twitch/main/Main.hs10
-rw-r--r--fig-monitor-twitch/src/Fig/Monitor/Twitch.hs172
-rw-r--r--fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs24
-rw-r--r--flake.nix46
4 files changed, 195 insertions, 57 deletions
diff --git a/fig-monitor-twitch/main/Main.hs b/fig-monitor-twitch/main/Main.hs
index 05f3cdb..b306586 100644
--- a/fig-monitor-twitch/main/Main.hs
+++ b/fig-monitor-twitch/main/Main.hs
@@ -10,14 +10,17 @@ import Fig.Monitor.Twitch.Utils
data Command
= Monitor
| Chatbot
- | RedirectServer
+ | LiveChecker
+ | RedirectServer Bool
| Validate
parseCommand :: Parser Command
parseCommand = subparser $ mconcat
[ command "monitor" $ info (pure Monitor) (progDesc "Launch the Twitch monitor")
, command "chatbot" $ info (pure Chatbot) (progDesc "Launch the Twitch chatbot")
- , command "user-token-server" $ info (pure RedirectServer) (progDesc "Launch a web server to handle authentication redirects")
+ , command "live-checker" $ info (pure LiveChecker) (progDesc "Launch the Twitch live status checker")
+ , command "user-token-server" $ info (pure $ RedirectServer True) (progDesc "Launch a web server to handle authentication redirects")
+ , command "user-token-server-read-only" $ info (pure $ RedirectServer False) (progDesc "Launch a web server to handle authentication redirects")
, command "validate-endpoint" $ info (pure Validate) (progDesc "Test Twitch authentication")
]
data Opts = Opts
@@ -44,5 +47,6 @@ main = do
case opts.command of
Monitor -> twitchEventClient cfg (opts.busHost, opts.busPort)
Chatbot -> twitchChatClient cfg (opts.busHost, opts.busPort)
- RedirectServer -> userTokenRedirectServer cfg
+ LiveChecker -> twitchChannelLiveMonitor cfg (opts.busHost, opts.busPort)
+ RedirectServer rw -> userTokenRedirectServer cfg rw
Validate -> twitchEndpointTest cfg
diff --git a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs
index 17f2b8a..ef493c4 100644
--- a/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs
+++ b/fig-monitor-twitch/src/Fig/Monitor/Twitch.hs
@@ -5,6 +5,7 @@
module Fig.Monitor.Twitch
( twitchEventClient
, twitchChatClient
+ , twitchChannelLiveMonitor
, twitchEndpointTest
, userTokenRedirectServer
) where
@@ -12,6 +13,7 @@ module Fig.Monitor.Twitch
import Fig.Prelude
import Control.Monad (unless)
+import Control.Concurrent (threadDelay)
import qualified Data.Maybe as Maybe
import qualified Data.Text as Text
@@ -57,6 +59,35 @@ loginToUserId login = do
_ -> mempty
maybe (throwM $ FigMonitorTwitchException "Failed to extract user ID") pure mid
+usersAreLive :: [Text] -> Authed (Map.Map Text Bool)
+usersAreLive users = do
+ log $ "Checking liveness for: " <> Text.intercalate " " users
+ res <- authedRequestJSON
+ "GET"
+ ( mconcat
+ [ "https://api.twitch.tv/helix/streams?type=live"
+ , mconcat $ ("&user_login="<>) <$> users
+ ]
+ )
+ ()
+ let mos = flip Aeson.parseMaybe res \obj -> do
+ obj .: "data" >>= \case
+ Aeson.Array os -> catMaybes . toList <$> forM os \case
+ Aeson.Object o -> Just <$> o .: "user_login"
+ _ -> pure Nothing
+ _ -> mempty
+ case mos of
+ Nothing -> throwM $ FigMonitorTwitchException "Failed to check liveness"
+ Just os -> Map.fromList <$> forM users \u -> do
+ let l = u `elem` os
+ log $ mconcat
+ [ u
+ , " is "
+ , if l then "" else "not "
+ , "live"
+ ]
+ pure (u, l)
+
subscribe :: Text -> Text -> Text -> Authed ()
subscribe sessionId event user = do
log $ "Subscribing to " <> event <> " events for user ID: " <> user
@@ -446,6 +477,37 @@ twitchEventClient cfg busAddr = do
)
(pure ())
+twitchChannelLiveMonitor :: Config -> (Text, Text) -> IO ()
+twitchChannelLiveMonitor cfg busAddr = do
+ busClient busAddr
+ (\cmds -> do
+ let
+ updateLive :: IO (Map.Map Text Bool)
+ updateLive = runAuthed cfg $ usersAreLive cfg.monitor
+ -- updateLive = fmap Map.fromList . runAuthed cfg $ forM cfg.monitor \user -> do
+ -- liftIO . threadDelay $ 5 * 1000000
+ -- (user,) <$> userIsLive user
+ loop :: Map.Map Text Bool -> IO ()
+ loop old = do
+ log "Updating liveness..."
+ new <- updateLive
+ log "Update complete!"
+ forM_ cfg.monitor \user ->
+ case (Map.lookup user old, Map.lookup user new) of
+ (Just False, Just True) -> do
+ log $ "Newly online: " <> user
+ cmds.publish [sexp|(monitor twitch stream online)|] [SExprString user]
+ (Just True, Just False) -> do
+ log $ "Newly offline: " <> user
+ cmds.publish [sexp|(monitor twitch stream offline)|] [SExprString user]
+ _ -> pure ()
+ threadDelay $ 5 * 60 * 1000000
+ loop new
+ loop Map.empty
+ )
+ (\_cmds _d -> pure ())
+ (pure ())
+
data IRCMessage = IRCMessage
{ tags :: Map.Map Text Text
, prefix :: Maybe Text
@@ -481,58 +543,60 @@ parseIRCMessage (Text.strip -> fullrest) =
twitchChatClient :: Config -> (Text, Text) -> IO ()
twitchChatClient cfg busAddr = do
log "Starting chatbot"
- WS.runSecureClient "irc-ws.chat.twitch.tv" 443 "/" \conn -> do
- WS.sendTextData conn $ "PASS oauth:" <> cfg.userToken
- WS.sendTextData conn ("NICK lcolonq" :: Text)
- WS.sendTextData conn ("CAP REQ :twitch.tv/commands twitch.tv/tags" :: Text)
- WS.sendTextData conn $ "JOIN #" <> cfg.monitorChat
- -- WS.sendTextData conn ("PRIVMSG #lcolonq :test the other direction" :: Text)
- busClient busAddr
- (\cmds -> do
- cmds.subscribe [sexp|(monitor twitch chat outgoing)|]
- forever do
- resp <- WS.receiveData conn
- forM (Text.lines resp) $ \line -> do
- let msg = parseIRCMessage line
- case msg.command of
- "PING" -> do
- log "Received PING, sending PONG"
- WS.sendTextData conn $ "PONG :" <> mconcat msg.params
- "CLEARCHAT" -> do
- log "Received CLEARCHAT"
- cmds.publish [sexp|(monitor twitch chat clear-chat)|] $ SExprString <$> msg.params
- "NOTICE" -> do
- log "Received NOTICE"
- cmds.publish [sexp|(monitor twitch chat notice)|] $ SExprString <$> msg.params
- "USERNOTICE" -> do
- log "Received USERNOTICE"
- cmds.publish [sexp|(monitor twitch chat user-notice)|] $ SExprString <$> msg.params
- "PRIVMSG"
- | Just displaynm <- Map.lookup "display-name" msg.tags
- , Nothing <- Map.lookup "custom-reward-id" msg.tags -> do
- cmds.publish [sexp|(monitor twitch chat incoming)|]
- [ SExprString . BS.Base64.encodeBase64 $ encodeUtf8 displaynm
- , SExprList $ (\(key, v) -> SExprList [SExprString key, SExprString v]) <$> Map.toList msg.tags
- , SExprString . BS.Base64.encodeBase64 . encodeUtf8 . Text.unwords $ drop 1 msg.params
- ]
- _ -> pure ()
- )
- (\_cmds d -> do
- case d of
- SExprList [ev, SExprString msg] | ev == [sexp|(monitor twitch chat outgoing)|] -> do
- log $ "Sending: " <> msg
- WS.sendTextData conn $ mconcat
- [ "PRIVMSG #"
- , cfg.monitorChat
- , " :"
- , msg
- ]
- _ -> log $ "Invalid outgoing message: " <> tshow d
- )
- (pure ())
+ case headMay cfg.monitor of
+ Nothing -> pure ()
+ Just chan -> WS.runSecureClient "irc-ws.chat.twitch.tv" 443 "/" \conn -> do
+ WS.sendTextData conn $ "PASS oauth:" <> cfg.userToken
+ WS.sendTextData conn ("NICK lcolonq" :: Text)
+ WS.sendTextData conn ("CAP REQ :twitch.tv/commands twitch.tv/tags" :: Text)
+ WS.sendTextData conn $ "JOIN #" <> chan
+ -- WS.sendTextData conn ("PRIVMSG #lcolonq :test the other direction" :: Text)
+ busClient busAddr
+ (\cmds -> do
+ cmds.subscribe [sexp|(monitor twitch chat outgoing)|]
+ forever do
+ resp <- WS.receiveData conn
+ forM (Text.lines resp) $ \line -> do
+ let msg = parseIRCMessage line
+ case msg.command of
+ "PING" -> do
+ log "Received PING, sending PONG"
+ WS.sendTextData conn $ "PONG :" <> mconcat msg.params
+ "CLEARCHAT" -> do
+ log "Received CLEARCHAT"
+ cmds.publish [sexp|(monitor twitch chat clear-chat)|] $ SExprString <$> msg.params
+ "NOTICE" -> do
+ log "Received NOTICE"
+ cmds.publish [sexp|(monitor twitch chat notice)|] $ SExprString <$> msg.params
+ "USERNOTICE" -> do
+ log "Received USERNOTICE"
+ cmds.publish [sexp|(monitor twitch chat user-notice)|] $ SExprString <$> msg.params
+ "PRIVMSG"
+ | Just displaynm <- Map.lookup "display-name" msg.tags
+ , Nothing <- Map.lookup "custom-reward-id" msg.tags -> do
+ cmds.publish [sexp|(monitor twitch chat incoming)|]
+ [ SExprString . BS.Base64.encodeBase64 $ encodeUtf8 displaynm
+ , SExprList $ (\(key, v) -> SExprList [SExprString key, SExprString v]) <$> Map.toList msg.tags
+ , SExprString . BS.Base64.encodeBase64 . encodeUtf8 . Text.unwords $ drop 1 msg.params
+ ]
+ _ -> pure ()
+ )
+ (\_cmds d -> do
+ case d of
+ SExprList [ev, SExprString msg] | ev == [sexp|(monitor twitch chat outgoing)|] -> do
+ log $ "Sending: " <> msg
+ WS.sendTextData conn $ mconcat
+ [ "PRIVMSG #"
+ , chan
+ , " :"
+ , msg
+ ]
+ _ -> log $ "Invalid outgoing message: " <> tshow d
+ )
+ (pure ())
-userTokenRedirectServer :: Config -> IO ()
-userTokenRedirectServer cfg = do
+userTokenRedirectServer :: Config -> Bool -> IO ()
+userTokenRedirectServer cfg rw = do
log "Starting token redirect server on port 4444"
Scotty.scottyOpts opts do
Scotty.get "/" do
@@ -548,7 +612,8 @@ userTokenRedirectServer cfg = do
{ Scotty.verbose = 0
, Scotty.settings = setPort 4444 (Scotty.settings def)
}
- scopes =
+ scopes = if rw then scopesReadWrite else scopesReadOnly
+ scopesReadWrite =
[ "channel:manage:polls"
, "channel:manage:predictions"
, "channel:manage:redemptions"
@@ -566,3 +631,6 @@ userTokenRedirectServer cfg = do
, "chat:read"
, "bits:read"
]
+ scopesReadOnly =
+ [
+ ]
diff --git a/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs b/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs
index 59ba04c..b21a976 100644
--- a/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs
+++ b/fig-monitor-twitch/src/Fig/Monitor/Twitch/Utils.hs
@@ -10,13 +10,16 @@ module Fig.Monitor.Twitch.Utils
, authedRequestJSON
, Authed
, runAuthed
+ , userIsLiveScrape
) where
import Fig.Prelude
import Control.Monad.Reader (ReaderT, runReaderT)
+import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BS.Lazy
+import qualified Data.Map.Strict as Map
import qualified Toml
@@ -33,7 +36,7 @@ data Config = Config
{ clientId :: Text
, userToken :: Text
, userLogin :: Text
- , monitorChat :: Text
+ , monitor :: [Text]
} deriving (Show, Eq, Ord)
configCodec :: Toml.TomlCodec Config
@@ -42,7 +45,7 @@ configCodec = do
userToken <- Toml.text "user_token" Toml..= (\a -> a.userToken)
-- userIds <- Toml.arrayOf Toml._Text "user_ids" Toml..= (\a -> a.userIds)
userLogin <- Toml.text "user_login" Toml..= (\a -> a.userLogin)
- monitorChat <- Toml.text "monitor_chat" Toml..= (\a -> a.monitorChat)
+ monitor <- Toml.arrayOf Toml._Text "monitor" Toml..= (\a -> a.monitor)
pure $ Config{..}
loadConfig :: FilePath -> IO Config
@@ -86,3 +89,20 @@ runAuthed :: Config -> Authed a -> IO a
runAuthed config body = do
manager <- HTTP.newManager HTTP.tlsManagerSettings
runReaderT body.unAuthed RequestConfig{..}
+
+userIsLiveScrape :: Text -> Authed Bool
+userIsLiveScrape user = do
+ rc <- ask
+ request <- liftIO . HTTP.parseRequest $ mconcat
+ [ "https://twitch.tv/"
+ , unpack user
+ ]
+ response <- liftIO $ HTTP.httpLbs request rc.manager
+ let res = BS.isInfixOf "\"isLiveBroadcast\":true" . BS.Lazy.toStrict $ HTTP.responseBody response
+ log $ mconcat
+ [ user
+ , " is "
+ , if res then "" else "not "
+ , "live"
+ ]
+ pure res
diff --git a/flake.nix b/flake.nix
index 7012071..5ab5b90 100644
--- a/flake.nix
+++ b/flake.nix
@@ -93,6 +93,51 @@
};
};
};
+ figMonitorTwitchLiveWatcherModule = { config, lib, ... }:
+ let
+ cfg = config.colonq.services.fig-monitor-twitch-live-watcher;
+ in {
+ options.colonq.services.fig-monitor-twitch-live-watcher = {
+ enable = lib.mkEnableOption "Enable the fig Twitch live watcher";
+ busHost = lib.mkOption {
+ type = lib.types.str;
+ default = "127.0.0.1";
+ description = "Message bus port";
+ };
+ busPort = lib.mkOption {
+ type = lib.types.port;
+ default = 32050;
+ description = "Address of message bus";
+ };
+ configFile = lib.mkOption {
+ type = lib.types.path;
+ description = "Path to config file";
+ default = pkgs.writeText "fig-monitor-twitch.toml" ''
+ client_id = ""
+ user_token = ""
+ user_login = ""
+ monitor = []
+ '';
+ };
+ };
+ config = lib.mkIf cfg.enable {
+ systemd.services."colonq.fig-monitor-twitch-live-watcher" = {
+ wantedBy = ["multi-user.target"];
+ after = ["colonq.fig-bus.service"];
+ serviceConfig = {
+ Restart = "on-failure";
+ ExecStart = "${haskellPackages.fig-monitor-twitch-live-watcher}/bin/fig-monitor-twitch live-checker --bus-host ${cfg.busHost} --bus-port ${toString cfg.busPort} --config ${cfg.configFile}";
+ DynamicUser = "yes";
+ RuntimeDirectory = "colonq.fig-monitor-twitch-live-watcher";
+ RuntimeDirectoryMode = "0755";
+ StateDirectory = "colonq.fig-monitor-twitch-live-watcher";
+ StateDirectoryMode = "0700";
+ CacheDirectory = "colonq.fig-monitor-twitch-live-watcher";
+ CacheDirectoryMode = "0750";
+ };
+ };
+ };
+ };
figMonitorDiscordModule = { config, lib, ... }:
let
cfg = config.colonq.services.fig-monitor-discord;
@@ -293,6 +338,7 @@
];
withHoogle = true;
buildInputs = [
+ haskellPackages.cabal-install
haskellPackages.haskell-language-server
pkgs.nodejs
(purescript.command {})