diff --git a/changelog.d/5-internal/checked-cluser-connect-redis b/changelog.d/5-internal/checked-cluser-connect-redis new file mode 100644 index 0000000000..ad510cfc73 --- /dev/null +++ b/changelog.d/5-internal/checked-cluser-connect-redis @@ -0,0 +1 @@ +use checkedConnectCluster to avoid dropping requests to Redis when Gundeck reconnects to the Redis cluster \ No newline at end of file diff --git a/services/gundeck/gundeck.cabal b/services/gundeck/gundeck.cabal index 46f15eb1de..e73b36ed04 100644 --- a/services/gundeck/gundeck.cabal +++ b/services/gundeck/gundeck.cabal @@ -46,6 +46,7 @@ library Gundeck.Push.Websocket Gundeck.React Gundeck.Redis + Gundeck.Redis.HedisExtensions Gundeck.Run Gundeck.ThreadBudget Gundeck.ThreadBudget.Internal diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index b9e4b68ea1..96cb36f211 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -37,6 +37,7 @@ import qualified Database.Redis as Redis import qualified Gundeck.Aws as Aws import Gundeck.Options as Opt import qualified Gundeck.Redis as Redis +import qualified Gundeck.Redis.HedisExtensions as Redis import Gundeck.ThreadBudget import Imports import Network.HTTP.Client (responseTimeoutMicro) @@ -131,7 +132,7 @@ createRedisPool l endpoint identifier = do . Log.field "connInfo" (show redisConnInfo) let connectWithRetry = Redis.connectRobust l (capDelay 1000000 (exponentialBackoff 50000)) r <- case endpoint ^. rConnectionMode of - Master -> connectWithRetry $ Redis.connect redisConnInfo - Cluster -> connectWithRetry $ Redis.connectCluster redisConnInfo + Master -> connectWithRetry $ Redis.checkedConnect redisConnInfo + Cluster -> connectWithRetry $ Redis.checkedConnectCluster redisConnInfo Log.info l $ Log.msg (Log.val $ "Established connection to " <> identifier <> ".") pure r diff --git a/services/gundeck/src/Gundeck/Redis.hs b/services/gundeck/src/Gundeck/Redis.hs index 74b116cf37..d8dfa59d36 100644 --- a/services/gundeck/src/Gundeck/Redis.hs +++ b/services/gundeck/src/Gundeck/Redis.hs @@ -35,6 +35,7 @@ import Control.Lens import qualified Control.Monad.Catch as Catch import Control.Retry import Database.Redis +import Gundeck.Redis.HedisExtensions import Imports import qualified System.Logger as Log import System.Logger.Class (MonadLogger) @@ -66,40 +67,31 @@ connectRobust :: Logger -> -- | e. g., @exponentialBackoff 50000@ RetryPolicy -> - -- | action returning a fresh initial 'Connection', e. g., @(connect connInfo)@ or @(connectCluster connInfo)@ + -- | action returning a fresh initial 'Connection', e. g., @(checkedConnect connInfo)@ or @(checkedConnectCluster connInfo)@ IO Connection -> IO RobustConnection connectRobust l retryStrategy connectLowLevel = do robustConnection <- newEmptyMVar @IO @ReConnection - reconnectRedis robustConnection + retry $ reconnectRedis robustConnection pure robustConnection where + retry = + recovering -- retry connecting, e. g., with exponential back-off + retryStrategy + [ const $ Catch.Handler (\(e :: ClusterDownError) -> logEx (Log.err l) e "Redis cluster down" >> pure True), + const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True), + const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True), + const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True), + const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True), + const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True) + ] + . const -- ignore RetryStatus reconnectRedis robustConnection = do + Log.info l $ Log.msg (Log.val "connecting to Redis") conn <- connectLowLevel + Log.info l $ Log.msg (Log.val "successfully connected to Redis") - Log.info l $ Log.msg (Log.val "lazy connection established, running ping...") - -- FUTUREWORK: With ping, we only verify that a single node is running as - -- opposed to verifying that all nodes of the cluster are up and running. - -- It remains unclear how cluster health can be verified in hedis. - void . runRedis conn $ do - res <- ping - case res of - Left r -> throwIO $ PingException r - Right _ -> pure () - Log.info l $ Log.msg (Log.val "ping went through") - - reconnectOnce <- - once $ -- avoid concurrent attempts to reconnect - recovering -- retry connecting, e. g., with exponential back-off - retryStrategy - [ const $ Catch.Handler (\(e :: ConnectError) -> logEx (Log.err l) e "Redis not in cluster mode" >> pure True), - const $ Catch.Handler (\(e :: ConnectTimeout) -> logEx (Log.err l) e "timeout when connecting to Redis" >> pure True), - const $ Catch.Handler (\(e :: ConnectionLostException) -> logEx (Log.err l) e "Redis connection lost during request" >> pure True), - const $ Catch.Handler (\(e :: PingException) -> logEx (Log.err l) e "pinging Redis failed" >> pure True), - const $ Catch.Handler (\(e :: IOException) -> logEx (Log.err l) e "network error when connecting to Redis" >> pure True) - ] - $ const $ - reconnectRedis robustConnection + reconnectOnce <- once . retry $ reconnectRedis robustConnection -- avoid concurrent attempts to reconnect let newReConnection = ReConnection {_rrConnection = conn, _rrReconnect = reconnectOnce} unlessM (tryPutMVar robustConnection newReConnection) $ void $ swapMVar robustConnection newReConnection diff --git a/services/gundeck/src/Gundeck/Redis/HedisExtensions.hs b/services/gundeck/src/Gundeck/Redis/HedisExtensions.hs new file mode 100644 index 0000000000..c102ba9b7d --- /dev/null +++ b/services/gundeck/src/Gundeck/Redis/HedisExtensions.hs @@ -0,0 +1,182 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +module Gundeck.Redis.HedisExtensions + ( ClusterInfoResponse (..), + ClusterInfoResponseState (..), + clusterInfo, + checkedConnectCluster, + ClusterDownError, + ) +where + +import qualified Data.ByteString.Char8 as Char8 +import Database.Redis +import Imports hiding (Down) +import UnliftIO + +-- https://redis.io/commands/cluster-info/ +data ClusterInfoResponse = ClusterInfoResponse + { clusterInfoResponseState :: ClusterInfoResponseState, + clusterInfoResponseSlotsAssigned :: Integer, + clusterInfoResponseSlotsOK :: Integer, + clusterInfoResponseSlotsPfail :: Integer, + clusterInfoResponseSlotsFail :: Integer, + clusterInfoResponseKnownNodes :: Integer, + clusterInfoResponseSize :: Integer, + clusterInfoResponseCurrentEpoch :: Integer, + clusterInfoResponseMyEpoch :: Integer, + clusterInfoResponseStatsMessagesSent :: Integer, + clusterInfoResponseStatsMessagesReceived :: Integer, + clusterInfoResponseTotalLinksBufferLimitExceeded :: Integer, + clusterInfoResponseStatsMessagesPingSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPingReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPongSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPongReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesMeetSent :: Maybe Integer, + clusterInfoResponseStatsMessagesMeetReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesFailSent :: Maybe Integer, + clusterInfoResponseStatsMessagesFailReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthReqSent :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthReqReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthAckSent :: Maybe Integer, + clusterInfoResponseStatsMessagesAuthAckReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesUpdateSent :: Maybe Integer, + clusterInfoResponseStatsMessagesUpdateReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesMfstartSent :: Maybe Integer, + clusterInfoResponseStatsMessagesMfstartReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesModuleSent :: Maybe Integer, + clusterInfoResponseStatsMessagesModuleReceived :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishshardSent :: Maybe Integer, + clusterInfoResponseStatsMessagesPublishshardReceived :: Maybe Integer + } + deriving (Show, Eq) + +data ClusterInfoResponseState + = OK + | Down + deriving (Show, Eq) + +defClusterInfoResponse :: ClusterInfoResponse +defClusterInfoResponse = + ClusterInfoResponse + { clusterInfoResponseState = Down, + clusterInfoResponseSlotsAssigned = 0, + clusterInfoResponseSlotsOK = 0, + clusterInfoResponseSlotsPfail = 0, + clusterInfoResponseSlotsFail = 0, + clusterInfoResponseKnownNodes = 0, + clusterInfoResponseSize = 0, + clusterInfoResponseCurrentEpoch = 0, + clusterInfoResponseMyEpoch = 0, + clusterInfoResponseStatsMessagesSent = 0, + clusterInfoResponseStatsMessagesReceived = 0, + clusterInfoResponseTotalLinksBufferLimitExceeded = 0, + clusterInfoResponseStatsMessagesPingSent = Nothing, + clusterInfoResponseStatsMessagesPingReceived = Nothing, + clusterInfoResponseStatsMessagesPongSent = Nothing, + clusterInfoResponseStatsMessagesPongReceived = Nothing, + clusterInfoResponseStatsMessagesMeetSent = Nothing, + clusterInfoResponseStatsMessagesMeetReceived = Nothing, + clusterInfoResponseStatsMessagesFailSent = Nothing, + clusterInfoResponseStatsMessagesFailReceived = Nothing, + clusterInfoResponseStatsMessagesPublishSent = Nothing, + clusterInfoResponseStatsMessagesPublishReceived = Nothing, + clusterInfoResponseStatsMessagesAuthReqSent = Nothing, + clusterInfoResponseStatsMessagesAuthReqReceived = Nothing, + clusterInfoResponseStatsMessagesAuthAckSent = Nothing, + clusterInfoResponseStatsMessagesAuthAckReceived = Nothing, + clusterInfoResponseStatsMessagesUpdateSent = Nothing, + clusterInfoResponseStatsMessagesUpdateReceived = Nothing, + clusterInfoResponseStatsMessagesMfstartSent = Nothing, + clusterInfoResponseStatsMessagesMfstartReceived = Nothing, + clusterInfoResponseStatsMessagesModuleSent = Nothing, + clusterInfoResponseStatsMessagesModuleReceived = Nothing, + clusterInfoResponseStatsMessagesPublishshardSent = Nothing, + clusterInfoResponseStatsMessagesPublishshardReceived = Nothing + } + +parseClusterInfoResponse :: [[ByteString]] -> ClusterInfoResponse -> Maybe ClusterInfoResponse +parseClusterInfoResponse fields resp = case fields of + [] -> pure resp + (["cluster_state", state] : fs) -> parseState state >>= \s -> parseClusterInfoResponse fs $ resp {clusterInfoResponseState = s} + (["cluster_slots_assigned", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsAssigned = v} + (["cluster_slots_ok", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsOK = v} + (["cluster_slots_pfail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsPfail = v} + (["cluster_slots_fail", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSlotsFail = v} + (["cluster_known_nodes", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseKnownNodes = v} + (["cluster_size", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseSize = v} + (["cluster_current_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseCurrentEpoch = v} + (["cluster_my_epoch", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseMyEpoch = v} + (["cluster_stats_messages_sent", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesSent = v} + (["cluster_stats_messages_received", value] : fs) -> parseInteger value >>= \v -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesReceived = v} + (["total_cluster_links_buffer_limit_exceeded", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseTotalLinksBufferLimitExceeded = fromMaybe 0 $ parseInteger value} -- this value should be mandatory according to the spec, but isn't necessarily set in Redis 6 + (["cluster_stats_messages_ping_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingSent = parseInteger value} + (["cluster_stats_messages_ping_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPingReceived = parseInteger value} + (["cluster_stats_messages_pong_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongSent = parseInteger value} + (["cluster_stats_messages_pong_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPongReceived = parseInteger value} + (["cluster_stats_messages_meet_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetSent = parseInteger value} + (["cluster_stats_messages_meet_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMeetReceived = parseInteger value} + (["cluster_stats_messages_fail_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailSent = parseInteger value} + (["cluster_stats_messages_fail_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesFailReceived = parseInteger value} + (["cluster_stats_messages_publish_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishSent = parseInteger value} + (["cluster_stats_messages_publish_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishReceived = parseInteger value} + (["cluster_stats_messages_auth_req_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqSent = parseInteger value} + (["cluster_stats_messages_auth_req_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthReqReceived = parseInteger value} + (["cluster_stats_messages_auth_ack_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckSent = parseInteger value} + (["cluster_stats_messages_auth_ack_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesAuthAckReceived = parseInteger value} + (["cluster_stats_messages_update_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateSent = parseInteger value} + (["cluster_stats_messages_update_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesUpdateReceived = parseInteger value} + (["cluster_stats_messages_mfstart_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartSent = parseInteger value} + (["cluster_stats_messages_mfstart_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesMfstartReceived = parseInteger value} + (["cluster_stats_messages_module_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleSent = parseInteger value} + (["cluster_stats_messages_module_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesModuleReceived = parseInteger value} + (["cluster_stats_messages_publishshard_sent", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardSent = parseInteger value} + (["cluster_stats_messages_publishshard_received", value] : fs) -> parseClusterInfoResponse fs $ resp {clusterInfoResponseStatsMessagesPublishshardReceived = parseInteger value} + (_ : fs) -> parseClusterInfoResponse fs resp + where + parseState bs = case bs of + "ok" -> Just OK + "fail" -> Just Down + _ -> Nothing + parseInteger = fmap fst . Char8.readInteger + +instance RedisResult ClusterInfoResponse where + decode r@(Bulk (Just bulkData)) = + maybe (Left r) Right + . flip parseClusterInfoResponse defClusterInfoResponse + . map (Char8.split ':' . Char8.takeWhile (/= '\r')) + $ Char8.lines bulkData + decode r = Left r + +clusterInfo :: RedisCtx m f => m (f ClusterInfoResponse) +clusterInfo = sendRequest ["CLUSTER", "INFO"] + +checkedConnectCluster :: ConnectInfo -> IO Connection +checkedConnectCluster connInfo = do + conn <- connectCluster connInfo + res <- runRedis conn clusterInfo + case res of + Right r -> case clusterInfoResponseState r of + OK -> pure conn + _ -> throwIO $ ClusterDownError r + Left e -> throwIO $ ConnectSelectError e + +newtype ClusterDownError = ClusterDownError ClusterInfoResponse deriving (Eq, Show, Typeable) + +instance Exception ClusterDownError