summaryrefslogtreecommitdiff
path: root/deps/discord-haskell/src/Discord/Internal/Gateway/EventLoop.hs
blob: dfcd00fd017ce4ac13481425cbe8a15ae12fc336 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}

-- | Provides logic code for interacting with the Discord websocket
--   gateway. Realistically, this is probably lower level than most
--   people will need
module Discord.Internal.Gateway.EventLoop where

import Prelude hiding (log)

import Control.Monad (forever, void)
import Control.Monad.Random (getRandomR)
import Control.Concurrent.Async (race)
import Control.Concurrent.Chan
import Control.Concurrent (threadDelay, killThread, forkIO)
import Control.Exception.Safe (try, finally, SomeException)
import Data.IORef
import Data.Aeson (eitherDecode, encode)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.ByteString.Lazy as BL
import Data.Time (getCurrentTime)

import Wuss (runSecureClient)
import Network.Socket (HostName)
import Network.WebSockets (ConnectionException(..), Connection,
                           receiveData, sendTextData, sendClose)

import Discord.Internal.Types
import Discord.Internal.Rest.Prelude (apiVersion)


-- | Info the event processing loop needs to
data GatewayHandle = GatewayHandle
  { -- | Realtime events from discord
    gatewayHandleEvents         :: Chan (Either GatewayException EventInternalParse),
    -- | Events the user sends to discord
    gatewayHandleUserSendables  :: Chan GatewaySendable,
    -- | Recent set status (resent to discord on reconnect)
    gatewayHandleLastStatus     :: IORef (Maybe UpdateStatusOpts),
    -- | Recent sent event sequence (used to reconnect)
    gatewayHandleLastSequenceId :: IORef Integer,
    -- | Which discord server session (used to reconnect)
    gatewayHandleSessionId      :: IORef T.Text,
    -- | Which discord gateway to connect to. This should contain a default value
    -- ("gateway.discord.gg") on first connect, but on subsequent Resumes this
    -- may contain a different value. This should never contain trailing slashes,
    -- or any "wss://" prefixes, since HostNames of this kind are not supported
    -- by the websockets library.
    gatewayHandleHostname       :: IORef HostName,
    -- | The last time a heartbeatack was received
    gatewayHandleHeartbeatAckTimes    :: IORef UTCTime,
    -- | The last two times a heartbeat was sent
    gatewayHandleHeartbeatTimes       :: IORef (UTCTime, UTCTime)
  }

-- | Ways the gateway connection can fail with no possibility of recovery.
newtype GatewayException = GatewayExceptionIntent T.Text
  deriving (Show)


-- | State of the eventloop
data LoopState = LoopStart
               | LoopClosed
               | LoopReconnect
  deriving Show

-- | Info the sendableLoop reads when it writes to the websocket
data SendablesData = SendablesData
  { sendableConnection :: Connection
  , librarySendables :: Chan GatewaySendableInternal
  , startsendingUsers :: IORef Bool
  , heartbeatInterval :: Integer
  }


-- | Gateway connection infinite loop. Get events from websocket and send them to the library user
--
-- @
--  Auth                                                         needed to connect
--  GatewayIntent                                                needed to connect
--  GatewayHandle (eventsGives,status,usersends,seq,sesh)        needed all over
--  log :: Chan (T.Text)                                         needed all over
--
--  sendableConnection                                 set by setup,  need sendableLoop
--  librarySendables :: Chan (GatewaySendableInternal) set by setup,  need heartbeat
--  heartbeatInterval :: Int                           set by Hello,  need heartbeat
--
--  sequenceId :: Int id of last event received        set by Resume, need heartbeat and reconnect
--  sessionId :: Text                                  set by Ready,  need reconnect
-- @
connectionLoop :: Auth -> GatewayIntent -> GatewayHandle -> Chan T.Text -> IO ()
connectionLoop auth intent gatewayHandle log = outerloop LoopStart
    where

    -- | Main connection loop. Catch exceptions and reconnect.
    outerloop :: LoopState -> IO ()
    outerloop state = do
        gatewayHost <- readIORef (gatewayHandleHostname gatewayHandle)
        mfirst <- firstmessage state -- construct first message
        case mfirst of
          Nothing -> pure () -- close

          Just message -> do
              nextstate <- try (startOneConnection gatewayHost message)  -- connection
              case nextstate :: Either SomeException LoopState of
                Left _ -> do t <- getRandomR (3,20)
                             threadDelay (t * (10^(6 :: Int)))
                             writeChan log "gateway - trying to reconnect after failure(s)"
                             outerloop LoopReconnect
                Right n -> outerloop n

    -- | Construct the initial websocket message to send based on which state of the loop.
    --   Fresh start is Identify and a reconnect is Resume
    firstmessage :: LoopState -> IO (Maybe GatewaySendableInternal)
    firstmessage state =
      case state of
        LoopStart -> pure $ Just $ Identify auth intent (0, 1)
        LoopReconnect -> do seqId  <- readIORef (gatewayHandleLastSequenceId gatewayHandle)
                            seshId <- readIORef (gatewayHandleSessionId gatewayHandle)
                            if seshId == ""
                            then do writeChan log "gateway - WARNING seshID was not set by READY?"
                                    pure $ Just $ Identify auth intent (0, 1)
                            else pure $ Just $ Resume auth seshId seqId
        LoopClosed -> pure Nothing

    startOneConnection
      :: HostName
      -- ^ The gateway address to connect to. Should be "gateway.discord.gg" on first try, but
      -- all Resumes should go to the resume_gateway_url specified in the Ready event
      -- https://discord.com/developers/docs/change-log#sessionspecific-gateway-resume-urls
      -> GatewaySendableInternal
      -- ^ The first message to send. Either an Identify or Resume.
      -> IO LoopState
    startOneConnection gatewayAddr message = runSecureClient gatewayAddr 443 ("/?v=" <> T.unpack apiVersion <>"&encoding=json") $ \conn -> do
                        msg <- getPayload conn log
                        case msg of
                            Right (Hello interval) -> do
                                -- setup sendables data
                                internal <- newChan :: IO (Chan GatewaySendableInternal)
                                sendingUser <- newIORef False
                                let sending = SendablesData { sendableConnection = conn
                                                            , librarySendables = internal
                                                            , startsendingUsers = sendingUser
                                                            , heartbeatInterval = interval
                                                            }
                                -- start websocket sending loop
                                sendsId <- forkIO $ sendableLoop conn gatewayHandle sending log
                                heart <- forkIO $ heartbeat sending (gatewayHandleHeartbeatTimes gatewayHandle) (gatewayHandleLastSequenceId gatewayHandle)
                                writeChan internal message

                                -- run connection eventloop
                                finally (runEventLoop gatewayHandle sending log)
                                        (killThread heart >> killThread sendsId)

                            _ -> do
                                writeChan log "gateway - WARNING could not connect. Expected hello"
                                sendClose conn ("expected hello" :: BL.ByteString)
                                void $ forever $ void (receiveData conn :: IO BL.ByteString)
                                -- > after sendClose you should call receiveDataMessage until
                                -- > it throws an exception
                                -- haskell websockets documentation
                                threadDelay (3 * (10^(6 :: Int)))
                                pure LoopStart


-- | Process events from discord and write them to the onDiscordEvent Channel
runEventLoop :: GatewayHandle -> SendablesData -> Chan T.Text -> IO LoopState
runEventLoop thehandle sendablesData log = do loop
  where
  eventChan :: Chan (Either GatewayException EventInternalParse)
  eventChan = gatewayHandleEvents thehandle

  -- | Keep receiving Dispatch events until a reconnect or a restart
  loop = do
    eitherPayload <- getPayloadTimeout sendablesData log
    case eitherPayload :: Either ConnectionException GatewayReceivable of

      Right (Dispatch event sq) -> do -- GOT AN EVENT:
                                      writeIORef (gatewayHandleLastSequenceId thehandle) sq
                                      writeChan eventChan (Right event) -- send the event to user
                                      case event of
                                        (InternalReady _ _ _ seshID resumeHost _ _) -> do
                                            writeIORef (gatewayHandleSessionId thehandle) seshID
                                            writeIORef (gatewayHandleHostname thehandle) resumeHost
                                        _ -> writeIORef (startsendingUsers sendablesData) True
                                      loop
      Right (Hello _interval) -> do writeChan log "eventloop - unexpected hello"
                                    loop
      Right (HeartbeatRequest sq) -> do writeIORef (gatewayHandleLastSequenceId thehandle) sq
                                        sendHeartbeat sendablesData (gatewayHandleHeartbeatTimes thehandle) sq
                                        loop
      Right (InvalidSession retry) -> pure $ if retry then LoopReconnect else LoopStart
      Right Reconnect        -> pure LoopReconnect
      Right HeartbeatAck     -> do
        currTime <- getCurrentTime
        _ <- atomicModifyIORef' (gatewayHandleHeartbeatAckTimes thehandle) (dupe . const currTime)
        loop
      Right (ParseError _)   -> loop  -- getPayload logs the parse error. nothing to do here

      Left (CloseRequest code str) -> case code of
          -- see Discord and MDN documentation on gateway close event codes
          -- https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes
          -- https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#properties
          1000 -> pure LoopReconnect
          1001 -> pure LoopReconnect
          4000 -> pure LoopReconnect
          4006 -> pure LoopStart
          4007 -> pure LoopStart
          4014 -> do writeChan eventChan (Left (GatewayExceptionIntent $
                           "Tried to declare an unauthorized GatewayIntent. " <>
                           "Use the discord app manager to authorize by following: " <>
                           "https://github.com/discord-haskell/discord-haskell/blob/master/docs/intents.md"))
                     pure LoopClosed
          _ -> do writeChan log ("gateway - unknown websocket close code " <> T.pack (show code)
                                  <> " [" <> TE.decodeUtf8 (BL.toStrict str) <> "]. Consider opening an issue "
                                  <> "https://github.com/discord-haskell/discord-haskell/issues")
                  pure LoopStart
      Left _ -> pure LoopReconnect


-- | Blocking wait for next payload from the websocket (returns "Reconnect" after 1.5*heartbeatInterval seconds)
getPayloadTimeout :: SendablesData -> Chan T.Text -> IO (Either ConnectionException GatewayReceivable)
getPayloadTimeout sendablesData log = do
  let interval = heartbeatInterval sendablesData
  res <- race (threadDelay (fromInteger ((interval * 1000 * 3) `div` 2)))
              (getPayload (sendableConnection sendablesData) log)
  case res of
    Left () -> pure (Right Reconnect)
    Right other -> pure other

-- | Blocking wait for next payload from the websocket
getPayload :: Connection -> Chan T.Text -> IO (Either ConnectionException GatewayReceivable)
getPayload conn log = try $ do
  msg' <- receiveData conn
  case eitherDecode msg' of
    Right msg -> pure msg
    Left  err -> do writeChan log ("gateway - received exception [" <> T.pack err <> "]"
                                      <> " while decoding " <> TE.decodeUtf8 (BL.toStrict msg'))
                    pure (ParseError (T.pack err))

-- | Infinite loop to send heartbeats to the chan
heartbeat :: SendablesData -> IORef (UTCTime, UTCTime) -> IORef Integer -> IO ()
heartbeat sendablesData sendTimes seqKey = do
  threadDelay (3 * 10^(6 :: Int))
  forever $ do
    num <- readIORef seqKey
    sendHeartbeat sendablesData sendTimes num
    threadDelay (fromInteger (heartbeatInterval sendablesData * 1000))

sendHeartbeat :: SendablesData -> IORef (UTCTime, UTCTime) -> Integer -> IO ()
sendHeartbeat sendablesData sendTimes seqKey = do
  currTime <- getCurrentTime
  _ <- atomicModifyIORef' sendTimes (dupe . (currTime,) . fst)
  writeChan (librarySendables sendablesData) (Heartbeat seqKey)

-- | Infinite loop to send library/user events to discord with the actual websocket connection
sendableLoop :: Connection -> GatewayHandle -> SendablesData -> Chan T.Text -> IO ()
sendableLoop conn ghandle sendablesData _log = sendLoop
  where
  sendLoop = do
   -- send a ~120 events a min by delaying
      threadDelay $ round ((10^(6 :: Int)) * (62 / 120) :: Double)
   -- payload :: Either GatewaySendableInternal GatewaySendable
      payload <- race nextLibrary nextUser
      sendTextData conn (either encode encode payload)
      sendLoop

  -- | next event sent by library
  nextLibrary :: IO GatewaySendableInternal
  nextLibrary = readChan (librarySendables sendablesData)

  -- | next event sent by user (once startsendingUsers is set)
  nextUser :: IO GatewaySendable
  nextUser = do usersending <- readIORef (startsendingUsers sendablesData)
                if usersending
                then readChan (gatewayHandleUserSendables ghandle)
                else threadDelay (4 * (10^(6::Int))) >> nextUser

dupe :: a -> (a, a)
dupe a = (a, a)