diff --git a/changelog.d/5-internal/pr-2512 b/changelog.d/5-internal/pr-2512 new file mode 100644 index 0000000000..8a009ef34b --- /dev/null +++ b/changelog.d/5-internal/pr-2512 @@ -0,0 +1 @@ +retry gundeck's Redis connection in case of network errors such as IP changes or network outages \ No newline at end of file diff --git a/services/gundeck/gundeck.cabal b/services/gundeck/gundeck.cabal index 4f67b2c36c..46f15eb1de 100644 --- a/services/gundeck/gundeck.cabal +++ b/services/gundeck/gundeck.cabal @@ -45,6 +45,7 @@ library Gundeck.Push.Native.Types Gundeck.Push.Websocket Gundeck.React + Gundeck.Redis Gundeck.Run Gundeck.ThreadBudget Gundeck.ThreadBudget.Internal diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index 0d66896a2d..b9e4b68ea1 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -25,6 +25,7 @@ import qualified Cassandra as C import qualified Cassandra.Settings as C import Control.AutoUpdate import Control.Lens (makeLenses, (^.)) +import Control.Retry (capDelay, exponentialBackoff) import Data.Default (def) import qualified Data.List.NonEmpty as NE import Data.Metrics.Middleware (Metrics) @@ -35,6 +36,7 @@ import Data.Time.Clock.POSIX import qualified Database.Redis as Redis import qualified Gundeck.Aws as Aws import Gundeck.Options as Opt +import qualified Gundeck.Redis as Redis import Gundeck.ThreadBudget import Imports import Network.HTTP.Client (responseTimeoutMicro) @@ -50,8 +52,8 @@ data Env = Env _applog :: !Logger.Logger, _manager :: !Manager, _cstate :: !ClientState, - _rstate :: !Redis.Connection, - _rstateAdditionalWrite :: !(Maybe Redis.Connection), + _rstate :: !Redis.RobustConnection, + _rstateAdditionalWrite :: !(Maybe Redis.RobustConnection), _awsEnv :: !Aws.Env, _time :: !(IO Milliseconds), _threadBudgetState :: !(Maybe ThreadBudgetState) @@ -113,7 +115,7 @@ reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg reqIdMsg = ("request" Logger..=) . unRequestId {-# INLINE reqIdMsg #-} -createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.Connection +createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.RobustConnection createRedisPool l endpoint identifier = do let redisConnInfo = Redis.defaultConnectInfo @@ -127,28 +129,9 @@ createRedisPool l endpoint identifier = do Log.msg (Log.val $ "starting connection to " <> identifier <> "...") . Log.field "connectionMode" (show $ endpoint ^. rConnectionMode) . Log.field "connInfo" (show redisConnInfo) + let connectWithRetry = Redis.connectRobust l (capDelay 1000000 (exponentialBackoff 50000)) r <- case endpoint ^. rConnectionMode of - Master -> Redis.checkedConnect redisConnInfo - Cluster -> checkedConnectCluster l redisConnInfo + Master -> connectWithRetry $ Redis.connect redisConnInfo + Cluster -> connectWithRetry $ Redis.connectCluster redisConnInfo Log.info l $ Log.msg (Log.val $ "Established connection to " <> identifier <> ".") pure r - --- | Similar to 'checkedConnect' but for redis cluster: --- Constructs a 'Connection' pool to a Redis server designated by the --- given 'ConnectInfo', then tests if the server is actually there. --- Throws an exception if the connection to the Redis server can't be --- established. --- --- Throws 'gundeck: ClusterConnectError (Error "ERR This instance has cluster support disabled")' when the redis server doesn't support cluster mode. -checkedConnectCluster :: Logger.Logger -> Redis.ConnectInfo -> IO Redis.Connection -checkedConnectCluster l connInfo = do - Log.info l $ Log.msg (Log.val "starting connection to redis in cluster mode ...") - conn <- Redis.connectCluster connInfo - Log.info l $ Log.msg (Log.val "lazy connection established, running ping...") - void . Redis.runRedis conn $ do - ping <- Redis.ping - case ping of - Left r -> error ("could not ping redis cluster: " <> show r) - Right _ -> pure () - Log.info l $ Log.msg (Log.val "ping went through") - pure conn diff --git a/services/gundeck/src/Gundeck/Monad.hs b/services/gundeck/src/Gundeck/Monad.hs index a95f5d9c36..120295e1f6 100644 --- a/services/gundeck/src/Gundeck/Monad.hs +++ b/services/gundeck/src/Gundeck/Monad.hs @@ -54,6 +54,7 @@ import Data.Default (def) import Data.Misc (Milliseconds (..)) import qualified Database.Redis as Redis import Gundeck.Env +import qualified Gundeck.Redis as Redis import Imports import Network.HTTP.Types import Network.Wai @@ -101,7 +102,7 @@ newtype WithDefaultRedis a = WithDefaultRedis {runWithDefaultRedis :: Gundeck a} instance Redis.MonadRedis WithDefaultRedis where liftRedis action = do defaultConn <- view rstate - liftIO $ Redis.runRedis defaultConn action + liftIO $ Redis.runRobust defaultConn action instance Redis.RedisCtx WithDefaultRedis (Either Redis.Reply) where returnDecode :: Redis.RedisResult a => Redis.Reply -> WithDefaultRedis (Either Redis.Reply a) @@ -130,13 +131,13 @@ newtype WithAdditionalRedis a = WithAdditionalRedis {runWithAdditionalRedis :: G instance Redis.MonadRedis WithAdditionalRedis where liftRedis action = do defaultConn <- view rstate - ret <- liftIO $ Redis.runRedis defaultConn action + ret <- liftIO $ Redis.runRobust defaultConn action mAdditionalRedisConn <- view rstateAdditionalWrite liftIO . for_ mAdditionalRedisConn $ \additionalRedisConn -> -- We just fire and forget this call, as there is not much we can do if -- this fails. - async $ Redis.runRedis additionalRedisConn action + async $ Redis.runRobust additionalRedisConn action pure ret diff --git a/services/gundeck/src/Gundeck/Redis.hs b/services/gundeck/src/Gundeck/Redis.hs new file mode 100644 index 0000000000..efaa8f01e6 --- /dev/null +++ b/services/gundeck/src/Gundeck/Redis.hs @@ -0,0 +1,127 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeApplications #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Gundeck.Redis + ( RobustConnection, + rrConnection, + rrReconnect, + connectRobust, + runRobust, + PingException, + ) +where + +import Control.Concurrent.Extra (once) +import Control.Lens +import qualified Control.Monad.Catch as Catch +import Control.Retry +import Database.Redis +import Imports +import qualified System.Logger as Log +import System.Logger.Extended +import UnliftIO.Exception + +-- | Connection to Redis which allows reconnecting. +type RobustConnection = MVar ReConnection + +data ReConnection = ReConnection + { -- | established (and potentially breaking) connection to Redis + _rrConnection :: Connection, + -- | action which can be called to reconnect to Redis + _rrReconnect :: IO () + } + +makeLenses ''ReConnection + +-- | Connection to Redis which can be reestablished on connection errors. +-- +-- Reconnecting even when Redis IPs change as long as the DNS name remains +-- constant. The server type (cluster or not) and the connection information of +-- the initial connection are used when reconnecting. +-- +-- Throws 'ConnectError', 'ConnectTimeout', 'ConnectionLostException', +-- 'PingException', or 'IOException' if retry policy is finite. +connectRobust :: + Logger -> + -- | e. g., @exponentialBackoff 50000@ + RetryPolicy -> + -- | action returning a fresh initial 'Connection', e. g., @(connect connInfo)@ or @(connectCluster connInfo)@ + IO Connection -> + IO RobustConnection +connectRobust l retryStrategy connectLowLevel = do + robustConnection <- newEmptyMVar @IO @ReConnection + reconnectRedis robustConnection + pure robustConnection + where + reconnectRedis robustConnection = do + conn <- connectLowLevel + + Log.info l $ Log.msg (Log.val "lazy connection established, running ping...") + -- FUTUREWORK: With ping, we only verify that a single node is running as + -- opposed to verifying that all nodes of the cluster are up and running. + -- It remains unclear how cluster health can be verified in hedis. + void . runRedis conn $ do + res <- ping + case res of + Left r -> throwIO $ PingException r + Right _ -> pure () + Log.info l $ Log.msg (Log.val "ping went through") + + reconnectOnce <- + once $ -- avoid concurrent attempts to reconnect + recovering -- retry connecting, e. g., with exponential back-off + retryStrategy + [ const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True), + const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True), + const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True), + const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True), + const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True) + ] + $ const $ + reconnectRedis robustConnection + let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce} + unlessM (tryPutMVar robustConnection newReConnection) $ + void $ swapMVar robustConnection newReConnection + +-- | Run a 'Redis' action through a 'RobustConnection'. +-- +-- Blocks on connection errors as long as the connection is not reestablished. +-- Without externally enforcing timeouts, this may lead to leaking threads. +runRobust :: RobustConnection -> Redis a -> IO a +runRobust mvar action = do + robustConnection <- readMVar mvar + catches + (runRedis (_rrConnection robustConnection) action) + [ Handler (\(_ :: ConnectionLostException) -> reconnectRetry robustConnection), -- Redis connection lost during request + Handler (\(_ :: IOException) -> reconnectRetry robustConnection) -- Redis unreachable + ] + where + reconnectRetry robustConnection = do + _rrReconnect robustConnection + runRobust mvar action + +logEx :: Show e => ((Msg -> Msg) -> IO ()) -> e -> ByteString -> IO () +logEx lLevel e description = lLevel $ Log.msg $ Log.val $ description <> ": " <> fromString (show e) + +data PingException = PingException Reply deriving (Show) + +instance Exception PingException diff --git a/services/gundeck/src/Gundeck/Run.hs b/services/gundeck/src/Gundeck/Run.hs index 51742bac5f..f25fcc47c4 100644 --- a/services/gundeck/src/Gundeck/Run.hs +++ b/services/gundeck/src/Gundeck/Run.hs @@ -24,6 +24,7 @@ import Cassandra (runClient, shutdown) import Cassandra.Schema (versionCheck) import Control.Exception (finally) import Control.Lens hiding (enum) +import Control.Monad.Extra import Data.Metrics (Metrics) import Data.Metrics.AWS (gaugeTokenRemaing) import Data.Metrics.Middleware (metrics) @@ -37,6 +38,7 @@ import qualified Gundeck.Env as Env import Gundeck.Monad import Gundeck.Options import Gundeck.React +import qualified Gundeck.Redis as Redis import Gundeck.ThreadBudget import Imports hiding (head) import Network.Wai as Wai @@ -66,7 +68,8 @@ run o = do Async.cancel lst Async.cancel wCollectAuth forM_ wtbs Async.cancel - Redis.disconnect (e ^. rstate) + Redis.disconnect . (^. Redis.rrConnection) =<< takeMVar (e ^. rstate) + whenJust (e ^. rstateAdditionalWrite) $ (=<<) (Redis.disconnect . (^. Redis.rrConnection)) . takeMVar Log.close (e ^. applog) where middleware :: Env -> Wai.Middleware