Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/pr-2512
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
retry gundeck's Redis connection in case of network errors such as IP changes or network outages
1 change: 1 addition & 0 deletions services/gundeck/gundeck.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ library
Gundeck.Push.Native.Types
Gundeck.Push.Websocket
Gundeck.React
Gundeck.Redis
Gundeck.Run
Gundeck.ThreadBudget
Gundeck.ThreadBudget.Internal
Expand Down
33 changes: 8 additions & 25 deletions services/gundeck/src/Gundeck/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import qualified Cassandra as C
import qualified Cassandra.Settings as C
import Control.AutoUpdate
import Control.Lens (makeLenses, (^.))
import Control.Retry (capDelay, exponentialBackoff)
import Data.Default (def)
import qualified Data.List.NonEmpty as NE
import Data.Metrics.Middleware (Metrics)
Expand All @@ -35,6 +36,7 @@ import Data.Time.Clock.POSIX
import qualified Database.Redis as Redis
import qualified Gundeck.Aws as Aws
import Gundeck.Options as Opt
import qualified Gundeck.Redis as Redis
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the reviewer: this line effectively pollutes the Redis package namespace in this module, however, I deemed this is justifiable, since the namespace is artificial and functions of both modules logically belong to Redis anyway.

import Gundeck.ThreadBudget
import Imports
import Network.HTTP.Client (responseTimeoutMicro)
Expand All @@ -50,8 +52,8 @@ data Env = Env
_applog :: !Logger.Logger,
_manager :: !Manager,
_cstate :: !ClientState,
_rstate :: !Redis.Connection,
_rstateAdditionalWrite :: !(Maybe Redis.Connection),
_rstate :: !Redis.RobustConnection,
_rstateAdditionalWrite :: !(Maybe Redis.RobustConnection),
_awsEnv :: !Aws.Env,
_time :: !(IO Milliseconds),
_threadBudgetState :: !(Maybe ThreadBudgetState)
Expand Down Expand Up @@ -113,7 +115,7 @@ reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg
reqIdMsg = ("request" Logger..=) . unRequestId
{-# INLINE reqIdMsg #-}

createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.Connection
createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.RobustConnection
createRedisPool l endpoint identifier = do
let redisConnInfo =
Redis.defaultConnectInfo
Expand All @@ -127,28 +129,9 @@ createRedisPool l endpoint identifier = do
Log.msg (Log.val $ "starting connection to " <> identifier <> "...")
. Log.field "connectionMode" (show $ endpoint ^. rConnectionMode)
. Log.field "connInfo" (show redisConnInfo)
let connectWithRetry = Redis.connectRobust l (capDelay 1000000 (exponentialBackoff 50000))
r <- case endpoint ^. rConnectionMode of
Master -> Redis.checkedConnect redisConnInfo
Cluster -> checkedConnectCluster l redisConnInfo
Master -> connectWithRetry $ Redis.connect redisConnInfo
Cluster -> connectWithRetry $ Redis.connectCluster redisConnInfo
Log.info l $ Log.msg (Log.val $ "Established connection to " <> identifier <> ".")
pure r

-- | Similar to 'checkedConnect' but for redis cluster:
-- Constructs a 'Connection' pool to a Redis server designated by the
-- given 'ConnectInfo', then tests if the server is actually there.
-- Throws an exception if the connection to the Redis server can't be
-- established.
--
-- Throws 'gundeck: ClusterConnectError (Error "ERR This instance has cluster support disabled")' when the redis server doesn't support cluster mode.
checkedConnectCluster :: Logger.Logger -> Redis.ConnectInfo -> IO Redis.Connection
checkedConnectCluster l connInfo = do
Log.info l $ Log.msg (Log.val "starting connection to redis in cluster mode ...")
conn <- Redis.connectCluster connInfo
Log.info l $ Log.msg (Log.val "lazy connection established, running ping...")
void . Redis.runRedis conn $ do
ping <- Redis.ping
case ping of
Left r -> error ("could not ping redis cluster: " <> show r)
Right _ -> pure ()
Log.info l $ Log.msg (Log.val "ping went through")
pure conn
7 changes: 4 additions & 3 deletions services/gundeck/src/Gundeck/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Data.Default (def)
import Data.Misc (Milliseconds (..))
import qualified Database.Redis as Redis
import Gundeck.Env
import qualified Gundeck.Redis as Redis
import Imports
import Network.HTTP.Types
import Network.Wai
Expand Down Expand Up @@ -101,7 +102,7 @@ newtype WithDefaultRedis a = WithDefaultRedis {runWithDefaultRedis :: Gundeck a}
instance Redis.MonadRedis WithDefaultRedis where
liftRedis action = do
defaultConn <- view rstate
liftIO $ Redis.runRedis defaultConn action
liftIO $ Redis.runRobust defaultConn action

instance Redis.RedisCtx WithDefaultRedis (Either Redis.Reply) where
returnDecode :: Redis.RedisResult a => Redis.Reply -> WithDefaultRedis (Either Redis.Reply a)
Expand Down Expand Up @@ -130,13 +131,13 @@ newtype WithAdditionalRedis a = WithAdditionalRedis {runWithAdditionalRedis :: G
instance Redis.MonadRedis WithAdditionalRedis where
liftRedis action = do
defaultConn <- view rstate
ret <- liftIO $ Redis.runRedis defaultConn action
ret <- liftIO $ Redis.runRobust defaultConn action

mAdditionalRedisConn <- view rstateAdditionalWrite
liftIO . for_ mAdditionalRedisConn $ \additionalRedisConn ->
-- We just fire and forget this call, as there is not much we can do if
-- this fails.
async $ Redis.runRedis additionalRedisConn action
async $ Redis.runRobust additionalRedisConn action

pure ret

Expand Down
127 changes: 127 additions & 0 deletions services/gundeck/src/Gundeck/Redis.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Gundeck.Redis
( RobustConnection,
rrConnection,
rrReconnect,
connectRobust,
runRobust,
PingException,
)
where

import Control.Concurrent.Extra (once)
import Control.Lens
import qualified Control.Monad.Catch as Catch
import Control.Retry
import Database.Redis
import Imports
import qualified System.Logger as Log
import System.Logger.Extended
import UnliftIO.Exception

-- | Connection to Redis which allows reconnecting.
type RobustConnection = MVar ReConnection

data ReConnection = ReConnection
{ -- | established (and potentially breaking) connection to Redis
_rrConnection :: Connection,
-- | action which can be called to reconnect to Redis
_rrReconnect :: IO ()
}

makeLenses ''ReConnection

-- | Connection to Redis which can be reestablished on connection errors.
--
-- Reconnecting even when Redis IPs change as long as the DNS name remains
-- constant. The server type (cluster or not) and the connection information of
-- the initial connection are used when reconnecting.
--
-- Throws 'ConnectError', 'ConnectTimeout', 'ConnectionLostException',
-- 'PingException', or 'IOException' if retry policy is finite.
connectRobust ::
Logger ->
-- | e. g., @exponentialBackoff 50000@
RetryPolicy ->
-- | action returning a fresh initial 'Connection', e. g., @(connect connInfo)@ or @(connectCluster connInfo)@
IO Connection ->
IO RobustConnection
connectRobust l retryStrategy connectLowLevel = do
robustConnection <- newEmptyMVar @IO @ReConnection
reconnectRedis robustConnection
pure robustConnection
where
reconnectRedis robustConnection = do
conn <- connectLowLevel

Log.info l $ Log.msg (Log.val "lazy connection established, running ping...")
-- FUTUREWORK: With ping, we only verify that a single node is running as
-- opposed to verifying that all nodes of the cluster are up and running.
-- It remains unclear how cluster health can be verified in hedis.
void . runRedis conn $ do
res <- ping
case res of
Left r -> throwIO $ PingException r
Right _ -> pure ()
Log.info l $ Log.msg (Log.val "ping went through")

reconnectOnce <-
once $ -- avoid concurrent attempts to reconnect
recovering -- retry connecting, e. g., with exponential back-off
retryStrategy
[ const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True),
const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True),
const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True),
const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True),
const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True)
]
$ const $
reconnectRedis robustConnection
let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce}
unlessM (tryPutMVar robustConnection newReConnection) $
void $ swapMVar robustConnection newReConnection

-- | Run a 'Redis' action through a 'RobustConnection'.
--
-- Blocks on connection errors as long as the connection is not reestablished.
-- Without externally enforcing timeouts, this may lead to leaking threads.
runRobust :: RobustConnection -> Redis a -> IO a
runRobust mvar action = do
robustConnection <- readMVar mvar
catches
(runRedis (_rrConnection robustConnection) action)
[ Handler (\(_ :: ConnectionLostException) -> reconnectRetry robustConnection), -- Redis connection lost during request
Handler (\(_ :: IOException) -> reconnectRetry robustConnection) -- Redis unreachable
Comment on lines +114 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add logs here? Without this it is a little confusing why the lazy connection starts restarting. Also, it would be nice to know which type of errors are causing a connection restart.

]
where
reconnectRetry robustConnection = do
_rrReconnect robustConnection
runRobust mvar action

logEx :: Show e => ((Msg -> Msg) -> IO ()) -> e -> ByteString -> IO ()
logEx lLevel e description = lLevel $ Log.msg $ Log.val $ description <> ": " <> fromString (show e)

data PingException = PingException Reply deriving (Show)

instance Exception PingException
5 changes: 4 additions & 1 deletion services/gundeck/src/Gundeck/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Cassandra (runClient, shutdown)
import Cassandra.Schema (versionCheck)
import Control.Exception (finally)
import Control.Lens hiding (enum)
import Control.Monad.Extra
import Data.Metrics (Metrics)
import Data.Metrics.AWS (gaugeTokenRemaing)
import Data.Metrics.Middleware (metrics)
Expand All @@ -37,6 +38,7 @@ import qualified Gundeck.Env as Env
import Gundeck.Monad
import Gundeck.Options
import Gundeck.React
import qualified Gundeck.Redis as Redis
import Gundeck.ThreadBudget
import Imports hiding (head)
import Network.Wai as Wai
Expand Down Expand Up @@ -66,7 +68,8 @@ run o = do
Async.cancel lst
Async.cancel wCollectAuth
forM_ wtbs Async.cancel
Redis.disconnect (e ^. rstate)
Redis.disconnect . (^. Redis.rrConnection) =<< takeMVar (e ^. rstate)
whenJust (e ^. rstateAdditionalWrite) $ (=<<) (Redis.disconnect . (^. Redis.rrConnection)) . takeMVar
Log.close (e ^. applog)
where
middleware :: Env -> Wai.Middleware
Expand Down