diff --git a/changelog.d/0-release-notes/cannon-drain b/changelog.d/0-release-notes/cannon-drain new file mode 100644 index 0000000000..d4e5e936ad --- /dev/null +++ b/changelog.d/0-release-notes/cannon-drain @@ -0,0 +1,2 @@ +The `.cannon.drainTimeout` setting on the wire-server helm chart has been +removed and replaced with `.cannon.config.drainOpts`. \ No newline at end of file diff --git a/changelog.d/2-features/cannon-drain b/changelog.d/2-features/cannon-drain new file mode 100644 index 0000000000..bd117af4d3 --- /dev/null +++ b/changelog.d/2-features/cannon-drain @@ -0,0 +1,4 @@ +Drain websockets in a controlled fashion when cannon receives a SIGTERM or +SIGINT. Instead of waiting for connections to close on their own, the websockets +are now severed at a controlled pace. This allows for quicker rollouts of new +versions. \ No newline at end of file diff --git a/charts/cannon/templates/configmap.yaml b/charts/cannon/templates/configmap.yaml index a7057e26b7..5513c279c0 100644 --- a/charts/cannon/templates/configmap.yaml +++ b/charts/cannon/templates/configmap.yaml @@ -13,6 +13,12 @@ data: gundeck: host: gundeck port: 8080 + + drainOpts: + gracePeriodSeconds: {{ .Values.config.drainOpts.gracePeriodSeconds }} + millisecondsBetweenBatches: {{ .Values.config.drainOpts.millisecondsBetweenBatches }} + minBatchSize: {{ .Values.config.drainOpts.minBatchSize }} + kind: ConfigMap metadata: name: cannon diff --git a/charts/cannon/templates/statefulset.yaml b/charts/cannon/templates/statefulset.yaml index d0397c07a1..e0c56298ab 100644 --- a/charts/cannon/templates/statefulset.yaml +++ b/charts/cannon/templates/statefulset.yaml @@ -30,17 +30,10 @@ spec: annotations: checksum/configmap: {{ include (print .Template.BasePath "/configmap.yaml") . | sha256sum }} spec: - terminationGracePeriodSeconds: {{ .Values.drainTimeout }} # should be higher than the sleep duration of preStop + terminationGracePeriodSeconds: {{ add .Values.config.drainOpts.gracePeriodSeconds 5 }} containers: - name: cannon image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - lifecycle: - preStop: - # kubernetes by default immediately sends a SIGTERM to the container, - # which would cause cannon to exit, breaking existing websocket connections. - # Instead we sleep for a day. (SIGTERM is still sent, but after the preStop completes) - exec: - command: ["sleep", {{ .Values.drainTimeout | quote }} ] volumeMounts: - name: empty mountPath: /etc/wire/cannon/externalHost @@ -65,7 +58,7 @@ spec: {{ toYaml .Values.resources | indent 12 }} initContainers: - name: cannon-configurator - image: alpine:3.13.1 + image: alpine:3.15.4 command: - /bin/sh args: diff --git a/charts/cannon/values.yaml b/charts/cannon/values.yaml index 66f87bd931..296435dc1d 100644 --- a/charts/cannon/values.yaml +++ b/charts/cannon/values.yaml @@ -5,6 +5,16 @@ image: pullPolicy: IfNotPresent config: logLevel: Info + + # See also the section 'Controlling the speed of websocket draining during + # cannon pod replacement' in docs/how-to/install/configuration-options.rst + drainOpts: + # The following drains a minimum of 400 connections/second + # for a total of 10000 over 25 seconds + # (if cannon holds more connections, draining will happen at a faster pace) + gracePeriodSeconds: 25 + millisecondsBetweenBatches: 50 + minBatchSize: 20 resources: requests: memory: "256Mi" @@ -16,4 +26,3 @@ service: name: cannon internalPort: 8080 externalPort: 8080 -drainTimeout: 0 diff --git a/deploy/services-demo/conf/cannon.demo-docker.yaml b/deploy/services-demo/conf/cannon.demo-docker.yaml index bdaa2be9e7..2d63eec9cf 100644 --- a/deploy/services-demo/conf/cannon.demo-docker.yaml +++ b/deploy/services-demo/conf/cannon.demo-docker.yaml @@ -7,5 +7,10 @@ gundeck: host: gundeck port: 8086 +drainOpts: + gracePeriodSeconds: 1 + millisecondsBetweenBatches: 5 + minBatchSize: 100 + logLevel: Info logNetStrings: false diff --git a/deploy/services-demo/conf/cannon.demo.yaml b/deploy/services-demo/conf/cannon.demo.yaml index 56f6430e58..999988bafb 100644 --- a/deploy/services-demo/conf/cannon.demo.yaml +++ b/deploy/services-demo/conf/cannon.demo.yaml @@ -7,5 +7,10 @@ gundeck: host: 127.0.0.1 port: 8086 +drainOpts: + gracePeriodSeconds: 1 + millisecondsBetweenBatches: 5 + minBatchSize: 100 + logLevel: Info logNetStrings: false diff --git a/docs/src/how-to/install/configuration-options.rst b/docs/src/how-to/install/configuration-options.rst index 791bd403a0..b681b721b1 100644 --- a/docs/src/how-to/install/configuration-options.rst +++ b/docs/src/how-to/install/configuration-options.rst @@ -119,6 +119,50 @@ Keys below ``gundeck.secrets`` belong into ``values/wire-server/secrets.yaml``: After making this change and applying it to gundeck (ensure gundeck pods have restarted to make use of the updated configuration - that should happen automatically), make sure to reset the push token on any mobile devices that you may have in use. +Controlling the speed of websocket draining during cannon pod replacement +------------------------------------------------------------------------- + +The 'cannon' component is responsible for persistent websocket connections. +Normally the default options would slowly and gracefully drain active websocket +connections over a maximum of ``(amount of cannon replicas * 30 seconds)`` during +the deployment of a new wire-server version. This will lead to a very brief +interruption for Wire clients when their client has to re-connect on the +websocket. + +You're not expected to need to change these settings. + +``drainOpts``: Drain websockets in a controlled fashion when cannon receives a +SIGTERM or SIGINT (this happens when a pod is terminated e.g. during rollout +of a new version). Instead of waiting for connections to close on their own, +the websockets are now severed at a controlled pace. This allows for quicker +rollouts of new versions. + +There is no way to entirely disable this behaviour, two extreme examples below + +* the quickest way to kill cannon is to set ``gracePeriodSeconds: 1`` and + ``minBatchSize: 100000`` which would sever all connections immediately; but it's + not recommended as you could DDoS yourself by forcing all active clients to + reconnect at the same time. With this, cannon pod replacement takes only 1 + second per pod. +* the slowest way to roll out a new version of cannon without severing websocket + connections for a long time is to set ``minBatchSize: 1``, + ``millisecondsBetweenBatches: 86400000`` and ``gracePeriodSeconds: 86400`` + which would lead to one single websocket connection being closed immediately, + and all others only after 1 day. With this, cannon pod replacement takes a + full day per pod. + +.. code:: yaml + + # overrides for wire-server/values.yaml + cannon: + drainOpts: + # The following defaults drain a minimum of 400 connections/second + # for a total of 10000 over 25 seconds + # (if cannon holds more connections, draining will happen at a faster pace) + gracePeriodSeconds: 25 + millisecondsBetweenBatches: 50 + minBatchSize: 20 + Blocking creation of personal users, new teams -------------------------------------------------------------------------- diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index d488fc427b..d625b5738f 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -89,6 +89,7 @@ library , data-timeout >=0.3 , exceptions >=0.6 , extended + , extra , gundeck-types , hashable >=1.2 , http-types >=0.8 @@ -107,6 +108,8 @@ library , text >=1.1 , tinylog >=0.10 , types-common >=0.16 + , unix + , unliftio , uuid >=1.3 , vector >=0.10 , wai >=3.0 diff --git a/services/cannon/cannon.integration.yaml b/services/cannon/cannon.integration.yaml index 0052516ad1..5bd5d2a706 100644 --- a/services/cannon/cannon.integration.yaml +++ b/services/cannon/cannon.integration.yaml @@ -16,5 +16,10 @@ gundeck: host: 127.0.0.1 port: 8086 +drainOpts: + gracePeriodSeconds: 1 + millisecondsBetweenBatches: 500 + minBatchSize: 5 + logLevel: Info logNetStrings: false diff --git a/services/cannon/cannon2.integration.yaml b/services/cannon/cannon2.integration.yaml index d3032eee1d..5c25937652 100644 --- a/services/cannon/cannon2.integration.yaml +++ b/services/cannon/cannon2.integration.yaml @@ -16,5 +16,10 @@ gundeck: host: 127.0.0.1 port: 8086 +drainOpts: + gracePeriodSeconds: 1 + millisecondsBetweenBatches: 5 + minBatchSize: 100 + logLevel: Info logNetStrings: false diff --git a/services/cannon/package.yaml b/services/cannon/package.yaml index dbbe3a73f0..a40cad3fa7 100644 --- a/services/cannon/package.yaml +++ b/services/cannon/package.yaml @@ -26,6 +26,7 @@ library: - data-default >=0.5 - data-timeout >=0.3 - exceptions >=0.6 + - extra - gundeck-types - hashable >=1.2 - http-types >=0.8 @@ -43,6 +44,8 @@ library: - text >=1.1 - tinylog >=0.10 - types-common >=0.16 + - unix + - unliftio - uuid >=1.3 - vector >=0.10 - wai >=3.0 diff --git a/services/cannon/src/Cannon/Dict.hs b/services/cannon/src/Cannon/Dict.hs index 6ba3edea14..765c6043bb 100644 --- a/services/cannon/src/Cannon/Dict.hs +++ b/services/cannon/src/Cannon/Dict.hs @@ -24,6 +24,7 @@ module Cannon.Dict removeIf, lookup, size, + toList, ) where @@ -32,10 +33,11 @@ import Data.SizedHashMap (SizedHashMap) import qualified Data.SizedHashMap as SHM import Data.Vector (Vector, (!)) import qualified Data.Vector as V -import Imports hiding (lookup) +import Imports hiding (lookup, toList) newtype Dict a b = Dict - {_map :: Vector (IORef (SizedHashMap a b))} + { _map :: Vector (IORef (SizedHashMap a b)) + } size :: MonadIO m => Dict a b -> m Int size d = liftIO $ sum <$> mapM (\r -> SHM.size <$> readIORef r) (_map d) @@ -68,6 +70,12 @@ removeIf f k d = liftIO . atomicModifyIORef' (getSlice k d) $ \m -> lookup :: (Eq a, Hashable a, MonadIO m) => a -> Dict a b -> m (Maybe b) lookup k = liftIO . fmap (SHM.lookup k) . readIORef . getSlice k +toList :: (MonadIO m, Hashable a) => Dict a b -> m [(a, b)] +toList = + fmap (mconcat . V.toList) + . V.mapM (fmap SHM.toList . readIORef) + . _map + ----------------------------------------------------------------------------- -- Internal diff --git a/services/cannon/src/Cannon/Options.hs b/services/cannon/src/Cannon/Options.hs index e28ebbb294..e2117ee8c3 100644 --- a/services/cannon/src/Cannon/Options.hs +++ b/services/cannon/src/Cannon/Options.hs @@ -29,7 +29,12 @@ module Cannon.Options logLevel, logNetStrings, logFormat, + drainOpts, Opts, + gracePeriodSeconds, + millisecondsBetweenBatches, + minBatchSize, + DrainOpts, ) where @@ -60,12 +65,30 @@ makeFields ''Gundeck deriveApiFieldJSON ''Gundeck +data DrainOpts = DrainOpts + { -- | Maximum amount of time draining should take. Must not be set to 0. + _drainOptsGracePeriodSeconds :: Word64, + -- | Maximum amount of time between batches, this speeds up draining in case + -- there are not many users connected. Must not be set to 0. + _drainOptsMillisecondsBetweenBatches :: Word64, + -- | Batch size is calculated considering actual number of websockets and + -- gracePeriod. If this number is too little, '_drainOptsMinBatchSize' is + -- used. + _drainOptsMinBatchSize :: Word64 + } + deriving (Eq, Show, Generic) + +makeFields ''DrainOpts + +deriveApiFieldJSON ''DrainOpts + data Opts = Opts { _optsCannon :: !Cannon, _optsGundeck :: !Gundeck, _optsLogLevel :: !Level, _optsLogNetStrings :: !(Maybe (Last Bool)), - _optsLogFormat :: !(Maybe (Last LogFormat)) + _optsLogFormat :: !(Maybe (Last LogFormat)), + _optsDrainOpts :: DrainOpts } deriving (Eq, Show, Generic) diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index 6d7ca5a644..d72119823e 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -27,7 +27,7 @@ import Cannon.API.Public import Cannon.App (maxPingInterval) import qualified Cannon.Dict as D import Cannon.Options -import Cannon.Types (Cannon, applog, clients, mkEnv, monitor, runCannon', runCannonToServant) +import Cannon.Types (Cannon, applog, clients, env, mkEnv, monitor, runCannon', runCannonToServant) import Cannon.WS hiding (env) import qualified Control.Concurrent.Async as Async import Control.Exception.Safe (catchAny) @@ -48,7 +48,10 @@ import Servant import qualified System.IO.Strict as Strict import qualified System.Logger.Class as LC import qualified System.Logger.Extended as L +import System.Posix.Signals +import qualified System.Posix.Signals as Signals import System.Random.MWC (createSystemRandom) +import UnliftIO.Concurrent (myThreadId, throwTo) import qualified Wire.API.Routes.Internal.Cannon as Internal import Wire.API.Routes.Public.Cannon import Wire.API.Routes.Version.Wai @@ -57,15 +60,16 @@ type CombinedAPI = PublicAPI :<|> Internal.API run :: Opts -> IO () run o = do + when (o ^. drainOpts . millisecondsBetweenBatches == 0) $ + error "drainOpts.millisecondsBetweenBatches must not be set to 0." + when (o ^. drainOpts . gracePeriodSeconds == 0) $ + error "drainOpts.gracePeriodSeconds must not be set to 0." ext <- loadExternal m <- Middleware.metrics g <- L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat) e <- - mkEnv <$> pure m - <*> pure ext - <*> pure o - <*> pure g - <*> D.empty 128 + mkEnv m ext o g + <$> D.empty 128 <*> newManager defaultManagerSettings {managerConnCount = 128} <*> createSystemRandom <*> mkClock @@ -83,6 +87,9 @@ run o = do server = hoistServer (Proxy @PublicAPI) (runCannonToServant e) publicAPIServer :<|> hoistServer (Proxy @Internal.API) (runCannonToServant e) internalServer + tid <- myThreadId + void $ installHandler sigTERM (signalHandler (env e) tid) Nothing + void $ installHandler sigINT (signalHandler (env e) tid) Nothing runSettings s app `finally` do Async.cancel refreshMetricsThread L.close (applog e) @@ -93,10 +100,20 @@ run o = do loadExternal :: IO ByteString loadExternal = do let extFile = fromMaybe (error "One of externalHost or externalHostFile must be defined") (o ^. cannon . externalHostFile) - fromMaybe (readExternal extFile) (return . encodeUtf8 <$> o ^. cannon . externalHost) + maybe (readExternal extFile) (return . encodeUtf8) (o ^. cannon . externalHost) readExternal :: FilePath -> IO ByteString readExternal f = encodeUtf8 . strip . pack <$> Strict.readFile f +signalHandler :: Env -> ThreadId -> Signals.Handler +signalHandler e mainThread = CatchOnce $ do + runWS e drain + throwTo mainThread SignalledToExit + +data SignalledToExit = SignalledToExit + deriving (Show) + +instance Exception SignalledToExit + refreshMetrics :: Cannon () refreshMetrics = do m <- monitor diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index 47962a8399..4d0caab6f6 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -109,7 +109,7 @@ mkEnv :: Env mkEnv m external o l d p g t = Env m o l d def $ - WS.env external (o ^. cannon . port) (encodeUtf8 $ o ^. gundeck . host) (o ^. gundeck . port) l p d g t + WS.env external (o ^. cannon . port) (encodeUtf8 $ o ^. gundeck . host) (o ^. gundeck . port) l p d g t (o ^. drainOpts) runCannon :: Env -> Cannon a -> Request -> IO a runCannon e c r = diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index caf339eed3..2813a8fe69 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -22,6 +22,7 @@ module Cannon.WS WS, env, runWS, + drain, close, mkWebSocket, setRequestId, @@ -50,8 +51,10 @@ import Bilge.RPC import Bilge.Retry import Cannon.Dict (Dict) import qualified Cannon.Dict as D +import Cannon.Options (DrainOpts, gracePeriodSeconds, millisecondsBetweenBatches, minBatchSize) import Conduit import Control.Concurrent.Timeout +import Control.Lens ((^.)) import Control.Monad.Catch import Control.Retry import Data.Aeson hiding (Error, Key) @@ -61,6 +64,7 @@ import qualified Data.ByteString.Lazy as L import Data.Default (def) import Data.Hashable import Data.Id (ClientId, ConnId (..), UserId) +import Data.List.Extra (chunksOf) import Data.Text.Encoding (decodeUtf8) import Data.Timeout (TimeoutUnit (..), (#)) import Gundeck.Types @@ -72,6 +76,7 @@ import Network.WebSockets hiding (Request) import qualified System.Logger as Logger import System.Logger.Class hiding (Error, Settings, close, (.=)) import System.Random.MWC (GenIO, uniform) +import UnliftIO.Async (async, cancel, pooledMapConcurrentlyN_) ----------------------------------------------------------------------------- -- Key @@ -140,7 +145,8 @@ data Env = Env manager :: !Manager, dict :: !(Dict Key Websocket), rand :: !GenIO, - clock :: !Clock + clock :: !Clock, + drainOpts :: DrainOpts } setRequestId :: RequestId -> Env -> Env @@ -185,6 +191,7 @@ env :: Dict Key Websocket -> GenIO -> Clock -> + DrainOpts -> Env env leh lp gh gp = Env leh lp (host gh . port gp $ empty) def @@ -242,6 +249,75 @@ sendMsg message k c = do kb = key2bytes k +-- | Closes all websockets connected to this instance of cannon. +-- +-- This function is not tested anywhere as it is difficult to write an automated +-- test for. Some pointers on testing this function: +-- +-- 1. Set values in cannon.integration.yaml for drainOpts such that it drains +-- "slowly", something like: +-- +-- @ +-- {gracePeriodSeconds: 1, millisecondsBetweenBatches: 500, minBatchSize: 5} +-- @ +-- +-- This will ensure that if there 10 or more websockets open, they get drained +-- in 2 batches of n/2. +-- +-- 2. Write a test in brig or galley using 'bracketRN' function from +-- tasty-cannon. This function doesn't require users to exist. Just pass it n +-- UserIds and threadDelay for a long-ish time. +-- +-- 3. During this threadDelay, send either SIGINT or SIGTERM to the cannon +-- process and use cannon logs to determine what is going on. +-- +-- Example test, which worked at the time of writing this comment: +-- +-- @ +-- testCannonDrain :: Cannon -> Http () +-- testCannonDrain cannon = do +-- users <- replicateM 50 randomId +-- WS.bracketRN cannon users $ \_websockets -> do +-- putStrLn "-------------------> Before delay" +-- threadDelay 100_000_000 +-- putStrLn "-------------------> After delay" +-- putStrLn "-------------------> After bracket" +-- @ +-- +-- Use @pkill -INT -f cannon.integration.yaml@ to send SIGINT to the cannon +-- process. +drain :: WS () +drain = do + opts <- asks drainOpts + websockets <- asks dict + numberOfConns <- fromIntegral <$> D.size websockets + let maxNumberOfBatches = (opts ^. gracePeriodSeconds * 1000) `div` (opts ^. millisecondsBetweenBatches) + computedBatchSize = numberOfConns `div` maxNumberOfBatches + batchSize = max (opts ^. minBatchSize) computedBatchSize + conns <- D.toList websockets + info $ + msg (val "draining all websockets") + . field "numberOfConns" numberOfConns + . field "computedBatchSize" computedBatchSize + . field "minBatchSize" (opts ^. minBatchSize) + . field "batchSize" batchSize + . field "maxNumberOfBatches" maxNumberOfBatches + + -- Sleeps for the grace period + 1 second. If the sleep completes, it means + -- that draining didn't finish, and we should log that. + timeoutAction <- async $ do + -- Allocate 1 second more than the grace period to allow for overhead of + -- spawning threads. + liftIO $ threadDelay ((opts ^. gracePeriodSeconds) # Second + 1 # Second) + err $ msg (val "Drain grace period expired") . field "gracePeriodSeconds" (opts ^. gracePeriodSeconds) + + for_ (chunksOf (fromIntegral batchSize) conns) $ \batch -> do + -- 16 was chosen with a roll of a fair dice. + void . async $ pooledMapConcurrentlyN_ 16 (uncurry close) batch + liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond) + cancel timeoutAction + info $ msg (val "Draining complete") + close :: Key -> Websocket -> WS () close k c = do let kb = key2bytes k