diff --git a/changelog.d/5-internal/pr-3136 b/changelog.d/5-internal/pr-3136 new file mode 100644 index 0000000000..3f6e05f35c --- /dev/null +++ b/changelog.d/5-internal/pr-3136 @@ -0,0 +1 @@ +Fix a memory leak in `gundeck` when Redis is offline diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index c7aa7f318b..a161f6c62a 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -24,6 +24,7 @@ import Cassandra (ClientState, Keyspace (..)) import qualified Cassandra as C import qualified Cassandra.Settings as C import Control.AutoUpdate +import Control.Concurrent.Async (Async) import Control.Lens (makeLenses, (^.)) import Control.Retry (capDelay, exponentialBackoff) import Data.Default (def) @@ -65,7 +66,7 @@ makeLenses ''Env schemaVersion :: Int32 schemaVersion = 7 -createEnv :: Metrics -> Opts -> IO Env +createEnv :: Metrics -> Opts -> IO ([Async ()], Env) createEnv m o = do l <- Logger.mkLogger (o ^. optLogLevel) (o ^. optLogNetStrings) (o ^. optLogFormat) c <- @@ -81,13 +82,13 @@ createEnv m o = do managerResponseTimeout = responseTimeoutMicro 5000000 } - r <- createRedisPool l (o ^. optRedis) "main-redis" + (rThread, r) <- createRedisPool l (o ^. optRedis) "main-redis" - rAdditional <- case o ^. optRedisAdditionalWrite of - Nothing -> pure Nothing + (rAdditionalThreads, rAdditional) <- case o ^. optRedisAdditionalWrite of + Nothing -> pure ([], Nothing) Just additionalRedis -> do - rAdd <- createRedisPool l additionalRedis "additional-write-redis" - pure $ Just rAdd + (rAddThread, rAdd) <- createRedisPool l additionalRedis "additional-write-redis" + pure ([rAddThread], Just rAdd) p <- C.init @@ -110,13 +111,13 @@ createEnv m o = do { updateAction = Ms . round . (* 1000) <$> getPOSIXTime } mtbs <- mkThreadBudgetState `mapM` (o ^. optSettings . setMaxConcurrentNativePushes) - pure $! Env def m o l n p r rAdditional a io mtbs + pure $! (rThread : rAdditionalThreads,) $! Env def m o l n p r rAdditional a io mtbs reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg reqIdMsg = ("request" Logger..=) . unRequestId {-# INLINE reqIdMsg #-} -createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.RobustConnection +createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO (Async (), Redis.RobustConnection) createRedisPool l endpoint identifier = do let redisConnInfo = Redis.defaultConnectInfo diff --git a/services/gundeck/src/Gundeck/Redis.hs b/services/gundeck/src/Gundeck/Redis.hs index 36298b0f67..c8f4b8e946 100644 --- a/services/gundeck/src/Gundeck/Redis.hs +++ b/services/gundeck/src/Gundeck/Redis.hs @@ -1,6 +1,6 @@ +{-# LANGUAGE NumDecimals #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeApplications #-} -- This file is part of the Wire Server implementation. @@ -22,16 +22,13 @@ module Gundeck.Redis ( RobustConnection, - rrConnection, - rrReconnect, connectRobust, runRobust, PingException, ) where -import Control.Concurrent.Extra (once) -import Control.Lens +import Control.Concurrent.Async (Async, async) import qualified Control.Monad.Catch as Catch import Control.Retry import Database.Redis @@ -44,16 +41,7 @@ import System.Logger.Extended import UnliftIO.Exception -- | Connection to Redis which allows reconnecting. -type RobustConnection = MVar ReConnection - -data ReConnection = ReConnection - { -- | established (and potentially breaking) connection to Redis - _rrConnection :: Connection, - -- | action which can be called to reconnect to Redis - _rrReconnect :: IO () - } - -makeLenses ''ReConnection +type RobustConnection = MVar Connection -- | Connection to Redis which can be reestablished on connection errors. -- @@ -69,11 +57,22 @@ connectRobust :: RetryPolicy -> -- | action returning a fresh initial 'Connection', e. g., @(checkedConnect connInfo)@ or @(checkedConnectCluster connInfo)@ IO Connection -> - IO RobustConnection + IO (Async (), RobustConnection) connectRobust l retryStrategy connectLowLevel = do - robustConnection <- newEmptyMVar @IO @ReConnection - retry $ reconnectRedis robustConnection - pure robustConnection + robustConnection <- newEmptyMVar @IO @Connection + thread <- + async $ safeForever $ do + Log.info l $ Log.msg (Log.val "connecting to Redis") + conn <- retry connectLowLevel + Log.info l $ Log.msg (Log.val "successfully connected to Redis") + putMVar robustConnection conn + catch + ( forever $ do + _ <- runRedis conn ping + threadDelay 1e6 + ) + $ \(_ :: SomeException) -> void $ takeMVar robustConnection + pure (thread, robustConnection) where retry = recovering -- retry connecting, e. g., with exponential back-off @@ -86,17 +85,6 @@ connectRobust l retryStrategy connectLowLevel = do const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True) ] . const -- ignore RetryStatus - reconnectRedis robustConnection = do - Log.info l $ Log.msg (Log.val "connecting to Redis") - conn <- connectLowLevel - Log.info l $ Log.msg (Log.val "successfully connected to Redis") - - reconnectOnce <- once . retry $ reconnectRedis robustConnection -- avoid concurrent attempts to reconnect - let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce} - unlessM (tryPutMVar robustConnection newReConnection) $ - void $ - swapMVar robustConnection newReConnection - logEx :: Show e => ((Msg -> Msg) -> IO ()) -> e -> ByteString -> IO () logEx lLevel e description = lLevel $ Log.msg (Log.val description) . Log.field "error" (show e) @@ -104,20 +92,20 @@ connectRobust l retryStrategy connectLowLevel = do -- -- Blocks on connection errors as long as the connection is not reestablished. -- Without externally enforcing timeouts, this may lead to leaking threads. -runRobust :: (MonadUnliftIO m, MonadLogger m) => RobustConnection -> Redis a -> m a -runRobust mvar action = do +runRobust :: (MonadUnliftIO m, MonadLogger m, Catch.MonadMask m) => RobustConnection -> Redis a -> m a +runRobust mvar action = retry $ do robustConnection <- readMVar mvar - catches - (liftIO $ runRedis (_rrConnection robustConnection) action) - [ logAndHandle $ Handler (\(_ :: ConnectionLostException) -> reconnectRetry robustConnection), -- Redis connection lost during request - logAndHandle $ Handler (\(_ :: IOException) -> reconnectRetry robustConnection) -- Redis unreachable - ] + liftIO $ runRedis robustConnection action where - reconnectRetry robustConnection = do - liftIO $ _rrReconnect robustConnection - runRobust mvar action - - logAndHandle (Handler handler) = + retryStrategy = capDelay 1000000 (exponentialBackoff 50000) + retry = + recovering -- retry connecting, e. g., with exponential back-off + retryStrategy + [ logAndHandle $ Catch.Handler (\(_ :: ConnectionLostException) -> pure True), + logAndHandle $ Catch.Handler (\(_ :: IOException) -> pure True) + ] + . const -- ignore RetryStatus + logAndHandle (Handler handler) _ = Handler $ \e -> do LogClass.err $ Log.msg (Log.val "Redis connection failed") . Log.field "error" (show e) handler e @@ -125,3 +113,13 @@ runRobust mvar action = do data PingException = PingException Reply deriving (Show) instance Exception PingException + +safeForever :: + forall m. + (MonadUnliftIO m) => + m () -> + m () +safeForever action = + forever $ + action `catchAny` \_ -> do + threadDelay 1e6 -- pause to keep worst-case noise in logs manageable diff --git a/services/gundeck/src/Gundeck/Run.hs b/services/gundeck/src/Gundeck/Run.hs index c8fc2eb908..66062f96e6 100644 --- a/services/gundeck/src/Gundeck/Run.hs +++ b/services/gundeck/src/Gundeck/Run.hs @@ -41,7 +41,6 @@ import qualified Gundeck.Env as Env import Gundeck.Monad import Gundeck.Options import Gundeck.React -import qualified Gundeck.Redis as Redis import Gundeck.ThreadBudget import Imports hiding (head) import Network.Wai as Wai @@ -59,7 +58,7 @@ import Wire.API.Routes.Version.Wai run :: Opts -> IO () run o = do m <- metrics - e <- createEnv m o + (rThreads, e) <- createEnv m o runClient (e ^. cstate) $ versionCheck schemaVersion let l = e ^. applog @@ -74,8 +73,9 @@ run o = do Async.cancel lst Async.cancel wCollectAuth forM_ wtbs Async.cancel - Redis.disconnect . (^. Redis.rrConnection) =<< takeMVar (e ^. rstate) - whenJust (e ^. rstateAdditionalWrite) $ (=<<) (Redis.disconnect . (^. Redis.rrConnection)) . takeMVar + forM_ rThreads Async.cancel + Redis.disconnect =<< takeMVar (e ^. rstate) + whenJust (e ^. rstateAdditionalWrite) $ (=<<) Redis.disconnect . takeMVar Log.close (e ^. applog) where middleware :: Env -> Wai.Middleware diff --git a/services/gundeck/test/integration/Util.hs b/services/gundeck/test/integration/Util.hs index fed87835bd..393486593e 100644 --- a/services/gundeck/test/integration/Util.hs +++ b/services/gundeck/test/integration/Util.hs @@ -24,7 +24,7 @@ withSettingsOverrides f action = do ts <- ask let opts = f (view tsOpts ts) m <- metrics - env <- liftIO $ createEnv m opts + (_rThreads, env) <- liftIO $ createEnv m opts liftIO . lowerCodensity $ do let app = mkApp env p <- withMockServer app