Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/pr-3136
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a memory leak in `gundeck` when Redis is offline
17 changes: 9 additions & 8 deletions services/gundeck/src/Gundeck/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Cassandra (ClientState, Keyspace (..))
import qualified Cassandra as C
import qualified Cassandra.Settings as C
import Control.AutoUpdate
import Control.Concurrent.Async (Async)
import Control.Lens (makeLenses, (^.))
import Control.Retry (capDelay, exponentialBackoff)
import Data.Default (def)
Expand Down Expand Up @@ -65,7 +66,7 @@ makeLenses ''Env
schemaVersion :: Int32
schemaVersion = 7

createEnv :: Metrics -> Opts -> IO Env
createEnv :: Metrics -> Opts -> IO ([Async ()], Env)
createEnv m o = do
l <- Logger.mkLogger (o ^. optLogLevel) (o ^. optLogNetStrings) (o ^. optLogFormat)
c <-
Expand All @@ -81,13 +82,13 @@ createEnv m o = do
managerResponseTimeout = responseTimeoutMicro 5000000
}

r <- createRedisPool l (o ^. optRedis) "main-redis"
(rThread, r) <- createRedisPool l (o ^. optRedis) "main-redis"

rAdditional <- case o ^. optRedisAdditionalWrite of
Nothing -> pure Nothing
(rAdditionalThreads, rAdditional) <- case o ^. optRedisAdditionalWrite of
Nothing -> pure ([], Nothing)
Just additionalRedis -> do
rAdd <- createRedisPool l additionalRedis "additional-write-redis"
pure $ Just rAdd
(rAddThread, rAdd) <- createRedisPool l additionalRedis "additional-write-redis"
pure ([rAddThread], Just rAdd)

p <-
C.init
Expand All @@ -110,13 +111,13 @@ createEnv m o = do
{ updateAction = Ms . round . (* 1000) <$> getPOSIXTime
}
mtbs <- mkThreadBudgetState `mapM` (o ^. optSettings . setMaxConcurrentNativePushes)
pure $! Env def m o l n p r rAdditional a io mtbs
pure $! (rThread : rAdditionalThreads,) $! Env def m o l n p r rAdditional a io mtbs

reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg
reqIdMsg = ("request" Logger..=) . unRequestId
{-# INLINE reqIdMsg #-}

createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO Redis.RobustConnection
createRedisPool :: Logger.Logger -> RedisEndpoint -> ByteString -> IO (Async (), Redis.RobustConnection)
createRedisPool l endpoint identifier = do
let redisConnInfo =
Redis.defaultConnectInfo
Expand Down
82 changes: 40 additions & 42 deletions services/gundeck/src/Gundeck/Redis.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE NumDecimals #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}

-- This file is part of the Wire Server implementation.
Expand All @@ -22,16 +22,13 @@

module Gundeck.Redis
( RobustConnection,
rrConnection,
rrReconnect,
connectRobust,
runRobust,
PingException,
)
where

import Control.Concurrent.Extra (once)
import Control.Lens
import Control.Concurrent.Async (Async, async)
import qualified Control.Monad.Catch as Catch
import Control.Retry
import Database.Redis
Expand All @@ -44,16 +41,7 @@ import System.Logger.Extended
import UnliftIO.Exception

-- | Connection to Redis which allows reconnecting.
type RobustConnection = MVar ReConnection

data ReConnection = ReConnection
{ -- | established (and potentially breaking) connection to Redis
_rrConnection :: Connection,
-- | action which can be called to reconnect to Redis
_rrReconnect :: IO ()
}

makeLenses ''ReConnection
type RobustConnection = MVar Connection

-- | Connection to Redis which can be reestablished on connection errors.
--
Expand All @@ -69,11 +57,22 @@ connectRobust ::
RetryPolicy ->
-- | action returning a fresh initial 'Connection', e. g., @(checkedConnect connInfo)@ or @(checkedConnectCluster connInfo)@
IO Connection ->
IO RobustConnection
IO (Async (), RobustConnection)
connectRobust l retryStrategy connectLowLevel = do
robustConnection <- newEmptyMVar @IO @ReConnection
retry $ reconnectRedis robustConnection
pure robustConnection
robustConnection <- newEmptyMVar @IO @Connection
thread <-
async $ safeForever $ do
Log.info l $ Log.msg (Log.val "connecting to Redis")
conn <- retry connectLowLevel
Log.info l $ Log.msg (Log.val "successfully connected to Redis")
putMVar robustConnection conn
catch
( forever $ do
_ <- runRedis conn ping
threadDelay 1e6
)
$ \(_ :: SomeException) -> void $ takeMVar robustConnection
pure (thread, robustConnection)
where
retry =
recovering -- retry connecting, e. g., with exponential back-off
Expand All @@ -86,42 +85,41 @@ connectRobust l retryStrategy connectLowLevel = do
const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True)
]
. const -- ignore RetryStatus
reconnectRedis robustConnection = do
Log.info l $ Log.msg (Log.val "connecting to Redis")
conn <- connectLowLevel
Log.info l $ Log.msg (Log.val "successfully connected to Redis")

reconnectOnce <- once . retry $ reconnectRedis robustConnection -- avoid concurrent attempts to reconnect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially you say, once was not working? If once was working, there should never have been a race for updating the MVar. Or rather, the MVar would be swapped several times which could have been optimised by putting swapMVar below once. Or is there more to it which I haven't grasped in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's bugging me is that I don't see the memory leak. After all pending Redis actions went through the (admittedly redundant) swapMVar, no memory should be retained. Though, I do see that it seems fixed by this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see it, which is why I spent 16 hours digging through heap profiles before I tried changing the code 😊 . My best guess is that once isn't safe in a concurrent environment. It's implemented (indirectly) via modifyMVar, which comes with this warning:

This function is only atomic if there are no other producers for this MVar. In other words, it cannot guarantee that, by the time modifyMVar_ gets the chance to write to the MVar, the value of the MVar has not been altered by a write operation from another thread.

which makes me think once is indeed the culprit. But I don't know!

The other difference between the two logics is that HEAD will still call runRedis in runRobust on a known-bad connection, and it could be some of the janky pipelining in hedis is responsible for leaking the stacks here.

let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce}
unlessM (tryPutMVar robustConnection newReConnection) $
void $
swapMVar robustConnection newReConnection

logEx :: Show e => ((Msg -> Msg) -> IO ()) -> e -> ByteString -> IO ()
logEx lLevel e description = lLevel $ Log.msg (Log.val description) . Log.field "error" (show e)

-- | Run a 'Redis' action through a 'RobustConnection'.
--
-- Blocks on connection errors as long as the connection is not reestablished.
-- Without externally enforcing timeouts, this may lead to leaking threads.
runRobust :: (MonadUnliftIO m, MonadLogger m) => RobustConnection -> Redis a -> m a
runRobust mvar action = do
runRobust :: (MonadUnliftIO m, MonadLogger m, Catch.MonadMask m) => RobustConnection -> Redis a -> m a
runRobust mvar action = retry $ do
robustConnection <- readMVar mvar
catches
(liftIO $ runRedis (_rrConnection robustConnection) action)
[ logAndHandle $ Handler (\(_ :: ConnectionLostException) -> reconnectRetry robustConnection), -- Redis connection lost during request
logAndHandle $ Handler (\(_ :: IOException) -> reconnectRetry robustConnection) -- Redis unreachable
]
liftIO $ runRedis robustConnection action
where
reconnectRetry robustConnection = do
liftIO $ _rrReconnect robustConnection
runRobust mvar action

logAndHandle (Handler handler) =
retryStrategy = capDelay 1000000 (exponentialBackoff 50000)
retry =
recovering -- retry connecting, e. g., with exponential back-off
retryStrategy
[ logAndHandle $ Catch.Handler (\(_ :: ConnectionLostException) -> pure True),
logAndHandle $ Catch.Handler (\(_ :: IOException) -> pure True)
]
. const -- ignore RetryStatus
logAndHandle (Handler handler) _ =
Handler $ \e -> do
LogClass.err $ Log.msg (Log.val "Redis connection failed") . Log.field "error" (show e)
handler e

data PingException = PingException Reply deriving (Show)

instance Exception PingException

safeForever ::
forall m.
(MonadUnliftIO m) =>
m () ->
m ()
safeForever action =
forever $
action `catchAny` \_ -> do
threadDelay 1e6 -- pause to keep worst-case noise in logs manageable
8 changes: 4 additions & 4 deletions services/gundeck/src/Gundeck/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import qualified Gundeck.Env as Env
import Gundeck.Monad
import Gundeck.Options
import Gundeck.React
import qualified Gundeck.Redis as Redis
import Gundeck.ThreadBudget
import Imports hiding (head)
import Network.Wai as Wai
Expand All @@ -59,7 +58,7 @@ import Wire.API.Routes.Version.Wai
run :: Opts -> IO ()
run o = do
m <- metrics
e <- createEnv m o
(rThreads, e) <- createEnv m o
runClient (e ^. cstate) $
versionCheck schemaVersion
let l = e ^. applog
Expand All @@ -74,8 +73,9 @@ run o = do
Async.cancel lst
Async.cancel wCollectAuth
forM_ wtbs Async.cancel
Redis.disconnect . (^. Redis.rrConnection) =<< takeMVar (e ^. rstate)
whenJust (e ^. rstateAdditionalWrite) $ (=<<) (Redis.disconnect . (^. Redis.rrConnection)) . takeMVar
forM_ rThreads Async.cancel
Redis.disconnect =<< takeMVar (e ^. rstate)
whenJust (e ^. rstateAdditionalWrite) $ (=<<) Redis.disconnect . takeMVar
Log.close (e ^. applog)
where
middleware :: Env -> Wai.Middleware
Expand Down
2 changes: 1 addition & 1 deletion services/gundeck/test/integration/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ withSettingsOverrides f action = do
ts <- ask
let opts = f (view tsOpts ts)
m <- metrics
env <- liftIO $ createEnv m opts
(_rThreads, env) <- liftIO $ createEnv m opts
liftIO . lowerCodensity $ do
let app = mkApp env
p <- withMockServer app
Expand Down