Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/checked-cluser-connect-redis
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use checkedConnectCluster to avoid dropping requests to Redis when Gundeck reconnects to the Redis cluster
1 change: 1 addition & 0 deletions services/gundeck/gundeck.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ library
Gundeck.Push.Websocket
Gundeck.React
Gundeck.Redis
Gundeck.Redis.HedisExtensions
Gundeck.Run
Gundeck.ThreadBudget
Gundeck.ThreadBudget.Internal
Expand Down
5 changes: 3 additions & 2 deletions services/gundeck/src/Gundeck/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import qualified Database.Redis as Redis
import qualified Gundeck.Aws as Aws
import Gundeck.Options as Opt
import qualified Gundeck.Redis as Redis
import qualified Gundeck.Redis.HedisExtensions as Redis
import Gundeck.ThreadBudget
import Imports
import Network.HTTP.Client (responseTimeoutMicro)
Expand Down Expand Up @@ -131,7 +132,7 @@ createRedisPool l endpoint identifier = do
. Log.field "connInfo" (show redisConnInfo)
let connectWithRetry = Redis.connectRobust l (capDelay 1000000 (exponentialBackoff 50000))
r <- case endpoint ^. rConnectionMode of
Master -> connectWithRetry $ Redis.connect redisConnInfo
Cluster -> connectWithRetry $ Redis.connectCluster redisConnInfo
Master -> connectWithRetry $ Redis.checkedConnect redisConnInfo
Cluster -> connectWithRetry $ Redis.checkedConnectCluster redisConnInfo
Log.info l $ Log.msg (Log.val $ "Established connection to " <> identifier <> ".")
pure r
42 changes: 17 additions & 25 deletions services/gundeck/src/Gundeck/Redis.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Control.Lens
import qualified Control.Monad.Catch as Catch
import Control.Retry
import Database.Redis
import Gundeck.Redis.HedisExtensions
import Imports
import qualified System.Logger as Log
import System.Logger.Class (MonadLogger)
Expand Down Expand Up @@ -66,40 +67,31 @@ connectRobust ::
Logger ->
-- | e. g., @exponentialBackoff 50000@
RetryPolicy ->
-- | action returning a fresh initial 'Connection', e. g., @(connect connInfo)@ or @(connectCluster connInfo)@
-- | action returning a fresh initial 'Connection', e. g., @(checkedConnect connInfo)@ or @(checkedConnectCluster connInfo)@
IO Connection ->
IO RobustConnection
connectRobust l retryStrategy connectLowLevel = do
robustConnection <- newEmptyMVar @IO @ReConnection
reconnectRedis robustConnection
retry $ reconnectRedis robustConnection
pure robustConnection
where
retry =
recovering -- retry connecting, e. g., with exponential back-off
retryStrategy
[ const $ Catch.Handler (\(e :: ClusterDownError) -> logEx (Log.err l) e "Redis cluster down" >> pure True),
const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True),
const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True),
const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True),
const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True),
const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True)
]
. const -- ignore RetryStatus
reconnectRedis robustConnection = do
Log.info l $ Log.msg (Log.val "connecting to Redis")
conn <- connectLowLevel
Log.info l $ Log.msg (Log.val "successfully connected to Redis")

Log.info l $ Log.msg (Log.val "lazy connection established, running ping...")
-- FUTUREWORK: With ping, we only verify that a single node is running as
-- opposed to verifying that all nodes of the cluster are up and running.
-- It remains unclear how cluster health can be verified in hedis.
void . runRedis conn $ do
res <- ping
case res of
Left r -> throwIO $ PingException r
Right _ -> pure ()
Log.info l $ Log.msg (Log.val "ping went through")

reconnectOnce <-
once $ -- avoid concurrent attempts to reconnect
recovering -- retry connecting, e. g., with exponential back-off
retryStrategy
[ const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True),
const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True),
const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True),
const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True),
const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True)
]
$ const $
reconnectRedis robustConnection
reconnectOnce <- once . retry $ reconnectRedis robustConnection -- avoid concurrent attempts to reconnect
let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce}
unlessM (tryPutMVar robustConnection newReConnection) $
void $ swapMVar robustConnection newReConnection
Expand Down
182 changes: 182 additions & 0 deletions services/gundeck/src/Gundeck/Redis/HedisExtensions.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.
module Gundeck.Redis.HedisExtensions
( ClusterInfoResponse (..),
ClusterInfoResponseState (..),
clusterInfo,
checkedConnectCluster,
ClusterDownError,
)
where

import qualified Data.ByteString.Char8 as Char8
import Database.Redis
import Imports hiding (Down)
import UnliftIO

-- https://redis.io/commands/cluster-info/
data ClusterInfoResponse = ClusterInfoResponse
{ clusterInfoResponseState :: ClusterInfoResponseState,
clusterInfoResponseSlotsAssigned :: Integer,
clusterInfoResponseSlotsOK :: Integer,
clusterInfoResponseSlotsPfail :: Integer,
clusterInfoResponseSlotsFail :: Integer,
clusterInfoResponseKnownNodes :: Integer,
clusterInfoResponseSize :: Integer,
clusterInfoResponseCurrentEpoch :: Integer,
clusterInfoResponseMyEpoch :: Integer,
clusterInfoResponseStatsMessagesSent :: Integer,
clusterInfoResponseStatsMessagesReceived :: Integer,
clusterInfoResponseTotalLinksBufferLimitExceeded :: Integer,
clusterInfoResponseStatsMessagesPingSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPingReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPongSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPongReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesMeetSent :: Maybe Integer,
clusterInfoResponseStatsMessagesMeetReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesFailSent :: Maybe Integer,
clusterInfoResponseStatsMessagesFailReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthReqSent :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthReqReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthAckSent :: Maybe Integer,
clusterInfoResponseStatsMessagesAuthAckReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesUpdateSent :: Maybe Integer,
clusterInfoResponseStatsMessagesUpdateReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesMfstartSent :: Maybe Integer,
clusterInfoResponseStatsMessagesMfstartReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesModuleSent :: Maybe Integer,
clusterInfoResponseStatsMessagesModuleReceived :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishshardSent :: Maybe Integer,
clusterInfoResponseStatsMessagesPublishshardReceived :: Maybe Integer
}
deriving (Show, Eq)

data ClusterInfoResponseState
= OK
| Down
deriving (Show, Eq)

defClusterInfoResponse :: ClusterInfoResponse
defClusterInfoResponse =
ClusterInfoResponse
{ clusterInfoResponseState = Down,
clusterInfoResponseSlotsAssigned = 0,
clusterInfoResponseSlotsOK = 0,
clusterInfoResponseSlotsPfail = 0,
clusterInfoResponseSlotsFail = 0,
clusterInfoResponseKnownNodes = 0,
clusterInfoResponseSize = 0,
clusterInfoResponseCurrentEpoch = 0,
clusterInfoResponseMyEpoch = 0,
clusterInfoResponseStatsMessagesSent = 0,
clusterInfoResponseStatsMessagesReceived = 0,
clusterInfoResponseTotalLinksBufferLimitExceeded = 0,
clusterInfoResponseStatsMessagesPingSent = Nothing,
clusterInfoResponseStatsMessagesPingReceived = Nothing,
clusterInfoResponseStatsMessagesPongSent = Nothing,
clusterInfoResponseStatsMessagesPongReceived = Nothing,
clusterInfoResponseStatsMessagesMeetSent = Nothing,
clusterInfoResponseStatsMessagesMeetReceived = Nothing,
clusterInfoResponseStatsMessagesFailSent = Nothing,
clusterInfoResponseStatsMessagesFailReceived = Nothing,
clusterInfoResponseStatsMessagesPublishSent = Nothing,
clusterInfoResponseStatsMessagesPublishReceived = Nothing,
clusterInfoResponseStatsMessagesAuthReqSent = Nothing,
clusterInfoResponseStatsMessagesAuthReqReceived = Nothing,
clusterInfoResponseStatsMessagesAuthAckSent = Nothing,
clusterInfoResponseStatsMessagesAuthAckReceived = Nothing,
clusterInfoResponseStatsMessagesUpdateSent = Nothing,
clusterInfoResponseStatsMessagesUpdateReceived = Nothing,
clusterInfoResponseStatsMessagesMfstartSent = Nothing,
clusterInfoResponseStatsMessagesMfstartReceived = Nothing,
clusterInfoResponseStatsMessagesModuleSent = Nothing,
clusterInfoResponseStatsMessagesModuleReceived = Nothing,
clusterInfoResponseStatsMessagesPublishshardSent = Nothing,
clusterInfoResponseStatsMessagesPublishshardReceived = Nothing
}

parseClusterInfoResponse :: [[ByteString]] -> ClusterInfoResponse -> Maybe ClusterInfoResponse
parseClusterInfoResponse fields resp = case fields of
[] -> pure resp
(["cluster_state", state] : fs) -> parseState state >>= \s -> parseClusterInfoResponse fs $ resp {clusterInfoResponseState = s}
(["cluster_slots_assigned", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsAssigned = v}
(["cluster_slots_ok", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsOK = v}
(["cluster_slots_pfail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsPfail = v}
(["cluster_slots_fail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsFail = v}
(["cluster_known_nodes", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseKnownNodes = v}
(["cluster_size", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSize = v}
(["cluster_current_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseCurrentEpoch = v}
(["cluster_my_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseMyEpoch = v}
(["cluster_stats_messages_sent", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesSent = v}
(["cluster_stats_messages_received", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesReceived = v}
(["total_cluster_links_buffer_limit_exceeded", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseTotalLinksBufferLimitExceeded = fromMaybe 0 $ parseInteger value} -- this value should be mandatory according to the spec, but isn't necessarily set in Redis 6
(["cluster_stats_messages_ping_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingSent = parseInteger value}
(["cluster_stats_messages_ping_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingReceived = parseInteger value}
(["cluster_stats_messages_pong_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongSent = parseInteger value}
(["cluster_stats_messages_pong_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongReceived = parseInteger value}
(["cluster_stats_messages_meet_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetSent = parseInteger value}
(["cluster_stats_messages_meet_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetReceived = parseInteger value}
(["cluster_stats_messages_fail_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailSent = parseInteger value}
(["cluster_stats_messages_fail_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailReceived = parseInteger value}
(["cluster_stats_messages_publish_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishSent = parseInteger value}
(["cluster_stats_messages_publish_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishReceived = parseInteger value}
(["cluster_stats_messages_auth_req_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqSent = parseInteger value}
(["cluster_stats_messages_auth_req_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqReceived = parseInteger value}
(["cluster_stats_messages_auth_ack_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckSent = parseInteger value}
(["cluster_stats_messages_auth_ack_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckReceived = parseInteger value}
(["cluster_stats_messages_update_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateSent = parseInteger value}
(["cluster_stats_messages_update_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateReceived = parseInteger value}
(["cluster_stats_messages_mfstart_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartSent = parseInteger value}
(["cluster_stats_messages_mfstart_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartReceived = parseInteger value}
(["cluster_stats_messages_module_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleSent = parseInteger value}
(["cluster_stats_messages_module_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleReceived = parseInteger value}
(["cluster_stats_messages_publishshard_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardSent = parseInteger value}
(["cluster_stats_messages_publishshard_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardReceived = parseInteger value}
(_ : fs) -> parseClusterInfoResponse fs resp
where
parseState bs = case bs of
"ok" -> Just OK
"fail" -> Just Down
_ -> Nothing
parseInteger = fmap fst . Char8.readInteger

instance RedisResult ClusterInfoResponse where
decode r@(Bulk (Just bulkData)) =
maybe (Left r) Right
. flip parseClusterInfoResponse defClusterInfoResponse
. map (Char8.split ':' . Char8.takeWhile (/= '\r'))
$ Char8.lines bulkData
decode r = Left r

clusterInfo :: RedisCtx m f => m (f ClusterInfoResponse)
clusterInfo = sendRequest ["CLUSTER", "INFO"]

checkedConnectCluster :: ConnectInfo -> IO Connection
checkedConnectCluster connInfo = do
conn <- connectCluster connInfo
res <- runRedis conn clusterInfo
case res of
Right r -> case clusterInfoResponseState r of
OK -> pure conn
_ -> throwIO $ ClusterDownError r
Left e -> throwIO $ ConnectSelectError e

newtype ClusterDownError = ClusterDownError ClusterInfoResponse deriving (Eq, Show, Typeable)

instance Exception ClusterDownError