From 5b73dfe0fd7c9f882c5e6887dc19828726166e87 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Tue, 5 Nov 2024 10:03:58 +0100 Subject: [PATCH 01/31] Initial implementation of rabbitmq pool --- libs/extended/src/Network/AMQP/Extended.hs | 2 + services/cannon/cannon.cabal | 2 + services/cannon/src/Cannon/RabbitMq.hs | 218 +++++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 services/cannon/src/Cannon/RabbitMq.hs diff --git a/libs/extended/src/Network/AMQP/Extended.hs b/libs/extended/src/Network/AMQP/Extended.hs index 4aa48aefc5b..1453f3909e4 100644 --- a/libs/extended/src/Network/AMQP/Extended.hs +++ b/libs/extended/src/Network/AMQP/Extended.hs @@ -11,6 +11,8 @@ module Network.AMQP.Extended demoteOpts, RabbitMqTlsOpts (..), mkConnectionOpts, + mkTLSSettings, + readCredsFromEnv, ) where diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index 4091540a846..cefc2da8054 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -23,6 +23,7 @@ library Cannon.App Cannon.Dict Cannon.Options + Cannon.RabbitMq Cannon.RabbitMqConsumerApp Cannon.Run Cannon.Types @@ -111,6 +112,7 @@ library , strict >=0.3.2 , text >=1.1 , tinylog >=0.10 + , transformers , types-common >=0.16 , unix , unliftio diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs new file mode 100644 index 00000000000..17b25423e27 --- /dev/null +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -0,0 +1,218 @@ +{-# OPTIONS -Wwarn #-} +{-# LANGUAGE RecordWildCards #-} + +module Cannon.RabbitMq + ( RabbitMqPoolException, + RabbitMqPoolOptions (..), + RabbitMqPool, + createRabbitMqPool, + RabbitMqChannel, + createChannel, + getMessage, + ) +where + +import Control.Concurrent.Async +import Control.Exception +import Control.Monad.Codensity +import Control.Monad.Trans.Except +import Control.Retry +import Data.List.Extra +import Imports +import Network.AMQP qualified as Q +import Network.AMQP.Extended +import System.Logger (Logger) +import System.Logger qualified as Log + +data RabbitMqPoolException + = TooManyChannels + | ChannelClosed + deriving (Eq, Show) + +instance Exception RabbitMqPoolException + +data PooledConnection = PooledConnection + { connId :: Word64, + inner :: Q.Connection, + numChannels :: !Int + } + +data RabbitMqPool = RabbitMqPool + { opts :: RabbitMqPoolOptions, + nextId :: TVar Word64, + -- TODO: use a priority queue + connections :: TVar [PooledConnection], + logger :: Logger, + deadVar :: MVar () + } + +data RabbitMqPoolOptions = RabbitMqPoolOptions + { maxConnections :: Int, + maxChannels :: Int, + endpoint :: AmqpEndpoint + } + +createRabbitMqPool :: RabbitMqPoolOptions -> Logger -> Codensity IO RabbitMqPool +createRabbitMqPool opts logger = Codensity $ bracket create destroy + where + create = do + deadVar <- newEmptyMVar + (nextId, connections) <- + atomically $ + (,) <$> newTVar 0 <*> newTVar [] + let pool = RabbitMqPool {..} + -- create one connection + void $ createConnection pool + pure pool + destroy pool = putMVar pool.deadVar () + +createConnection :: RabbitMqPool -> IO Q.Connection +createConnection pool = mask_ $ do + conn <- openConnection pool + pconn <- atomically $ do + connId <- readTVar pool.nextId + writeTVar pool.nextId $! succ connId + let c = + PooledConnection + { connId = connId, + numChannels = 0, + inner = conn + } + modifyTVar pool.connections (c :) + pure c + + closedVar <- newEmptyMVar + -- Fire and forget: the thread will terminate by itself as soon as the + -- connection is closed (or if the pool is destroyed). + -- Asynchronous exception safety is guaranteed because exceptions are masked + -- in this whole block. + void . async $ do + v <- race (takeMVar closedVar) (readMVar pool.deadVar) + when (isRight v) $ + -- close connection and ignore exceptions + catch @SomeException (Q.closeConnection conn) $ + \_ -> pure () + atomically $ do + conns <- readTVar pool.connections + writeTVar pool.connections $ + filter (\c -> c.connId /= pconn.connId) conns + Q.addConnectionClosedHandler conn True $ do + putMVar closedVar () + pure conn + +openConnection :: RabbitMqPool -> IO Q.Connection +openConnection pool = do + (username, password) <- readCredsFromEnv + recovering + rabbitMqRetryPolicy + ( skipAsyncExceptions + <> [logRetries (const $ pure True) (logConnectionError pool.logger)] + ) + ( const $ do + Log.info pool.logger $ + Log.msg (Log.val "Trying to connect to RabbitMQ") + mTlsSettings <- + traverse + (liftIO . (mkTLSSettings pool.opts.endpoint.host)) + pool.opts.endpoint.tls + liftIO $ + Q.openConnection'' $ + Q.defaultConnectionOpts + { Q.coServers = + [ ( pool.opts.endpoint.host, + fromIntegral pool.opts.endpoint.port + ) + ], + Q.coVHost = pool.opts.endpoint.vHost, + Q.coAuth = [Q.plain username password], + Q.coTLSSettings = fmap Q.TLSCustom mTlsSettings + } + ) + +data RabbitMqChannel = RabbitMqChannel + { inner :: MVar Q.Channel, + msgVar :: MVar (Maybe (Q.Message, Q.Envelope)) + } + +getMessage :: RabbitMqChannel -> IO (Q.Message, Q.Envelope) +getMessage chan = takeMVar chan.msgVar >>= maybe (throwIO ChannelClosed) pure + +createChannel :: RabbitMqPool -> Text -> IO RabbitMqChannel +createChannel pool queue = do + closedVar <- newEmptyMVar + inner <- newEmptyMVar + msgVar <- newEmptyMVar + let manageChannel = do + conn <- acquireConnection pool + chan <- Q.openChannel conn.inner + void $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do + putMVar msgVar (Just (message, envelope)) + + Q.addChannelExceptionHandler chan $ \e -> do + releaseConnection pool conn + + retry <- case (Q.isNormalChannelClose e, fromException e) of + (True, _) -> do + Log.info pool.logger $ + Log.msg (Log.val "RabbitMQ channel is closed normally, not attempting to reopen channel") + pure False + (_, Just (Q.ConnectionClosedException {})) -> do + Log.info pool.logger $ + Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") + pure False + _ -> do + logException pool.logger "RabbitMQ channel closed" e + pure True + + putMVar closedVar retry + + retry <- takeMVar closedVar + if retry + then manageChannel + else putMVar msgVar Nothing + + -- TODO: leaking async + void . mask_ $ async manageChannel + pure RabbitMqChannel {inner = inner, msgVar = msgVar} + +acquireConnection :: RabbitMqPool -> IO PooledConnection +acquireConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do + conns <- lift $ readTVar pool.connections + let pconn = minimumOn (.numChannels) conns + -- TODO: spawn new connection if possible + when (pconn.numChannels >= pool.opts.maxChannels) $ + throwE TooManyChannels + + let pconn' = + pconn' + { numChannels = + max pool.opts.maxChannels (succ (numChannels pconn)) + } + lift $ + writeTVar pool.connections $! + map (\c -> if c.connId == pconn'.connId then pconn' else c) conns + pure pconn' + +releaseConnection :: RabbitMqPool -> PooledConnection -> IO () +releaseConnection pool conn = atomically $ do + modifyTVar pool.connections $ map $ \c -> + if c.connId == conn.connId + then c {numChannels = pred (numChannels c)} + else c + +logConnectionError :: Logger -> Bool -> SomeException -> RetryStatus -> IO () +logConnectionError l willRetry e retryStatus = do + Log.err l $ + Log.msg (Log.val "Failed to connect to RabbitMQ") + . Log.field "error" (displayException @SomeException e) + . Log.field "willRetry" willRetry + . Log.field "retryCount" retryStatus.rsIterNumber + +logException :: (MonadIO m) => Logger -> String -> SomeException -> m () +logException l m (SomeException e) = do + Log.err l $ + Log.msg m + . Log.field "error" (displayException e) + +rabbitMqRetryPolicy :: RetryPolicyM IO +rabbitMqRetryPolicy = limitRetriesByCumulativeDelay 1_000_000 $ fullJitterBackoff 1000 From 69441828c4d08ef68503413e048f44f1308b2a1c Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Wed, 6 Nov 2024 10:49:56 +0100 Subject: [PATCH 02/31] Set and unset inner channel --- services/cannon/src/Cannon/RabbitMq.hs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 17b25423e27..1e2c8865c3d 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -145,10 +145,12 @@ createChannel pool queue = do let manageChannel = do conn <- acquireConnection pool chan <- Q.openChannel conn.inner + putMVar inner chan void $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do putMVar msgVar (Just (message, envelope)) Q.addChannelExceptionHandler chan $ \e -> do + void $ takeMVar inner releaseConnection pool conn retry <- case (Q.isNormalChannelClose e, fromException e) of @@ -159,7 +161,7 @@ createChannel pool queue = do (_, Just (Q.ConnectionClosedException {})) -> do Log.info pool.logger $ Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") - pure False + pure False -- TODO: change this to true? _ -> do logException pool.logger "RabbitMQ channel closed" e pure True @@ -168,7 +170,8 @@ createChannel pool queue = do retry <- takeMVar closedVar if retry - then manageChannel + then -- TODO: exponential backoff? + manageChannel else putMVar msgVar Nothing -- TODO: leaking async From bfc07cfc56903058408a80470678bccf712dc994 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Wed, 6 Nov 2024 14:22:37 +0100 Subject: [PATCH 03/31] Integrate rabbitmq pool into cannon --- services/cannon/src/Cannon/RabbitMq.hs | 8 +- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 90 +++++++------------ services/cannon/src/Cannon/Run.hs | 53 ++++++----- services/cannon/src/Cannon/Types.hs | 31 ++++++- services/cannon/src/Cannon/WS.hs | 8 +- 5 files changed, 100 insertions(+), 90 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 1e2c8865c3d..f8eed7e7ab7 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -6,9 +6,10 @@ module Cannon.RabbitMq RabbitMqPoolOptions (..), RabbitMqPool, createRabbitMqPool, - RabbitMqChannel, + RabbitMqChannel (..), createChannel, getMessage, + ackMessage, ) where @@ -137,6 +138,11 @@ data RabbitMqChannel = RabbitMqChannel getMessage :: RabbitMqChannel -> IO (Q.Message, Q.Envelope) getMessage chan = takeMVar chan.msgVar >>= maybe (throwIO ChannelClosed) pure +ackMessage :: RabbitMqChannel -> Word64 -> Bool -> IO () +ackMessage chan deliveryTag multiple = do + inner <- readMVar chan.inner + Q.ackMsg inner deliveryTag multiple + createChannel :: RabbitMqPool -> Text -> IO RabbitMqChannel createChannel pool queue = do closedVar <- newEmptyMVar diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 112b7ad8d2a..4c4d09ae529 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -5,6 +5,7 @@ module Cannon.RabbitMqConsumerApp where import Cannon.App (rejectOnError) import Cannon.Dict qualified as D import Cannon.Options +import Cannon.RabbitMq import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async @@ -18,7 +19,6 @@ import Data.List.Extra hiding (delete) import Data.Timeout (TimeoutUnit (..), (#)) import Imports hiding (min, threadDelay) import Network.AMQP qualified as Q -import Network.AMQP.Extended (withConnection) import Network.WebSockets import Network.WebSockets qualified as WS import System.Logger qualified as Log @@ -78,12 +78,11 @@ drainRabbitQueues e = do rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp rabbitMQWebSocketApp uid cid e pendingConn = do wsVar <- newEmptyMVar - msgVar <- newEmptyMVar bracket (openWebSocket wsVar) closeWebSocket $ \(wsConn, _) -> ( do sendFullSyncMessageIfNeeded wsVar wsConn uid cid e - sendNotifications wsConn msgVar wsVar + sendNotifications wsConn wsVar ) `catches` [ handleClientMisbehaving wsConn, handleWebSocketExceptions wsConn @@ -116,28 +115,26 @@ rabbitMQWebSocketApp uid cid e pendingConn = do -- ignore any exceptions when sending the close message void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString) - -- Create a rabbitmq consumer that receives messages and saves them into an MVar - createConsumer :: - Q.Channel -> - MVar (Either Q.AMQPException EventData) -> - IO Q.ConsumerTag - createConsumer chan msgVar = do - Q.consumeMsgs chan (clientNotificationQueueName uid cid) Q.Ack $ - \(msg, envelope) -> case eitherDecode @QueuedNotification msg.msgBody of - Left err -> do - logParseError err - -- This message cannot be parsed, make sure it doesn't requeue. There - -- is no need to throw an error and kill the websocket as this is - -- probably caused by a bug or someone messing with RabbitMQ. - -- - -- The bug case is slightly dangerous as it could drop a lot of events - -- en masse, if at some point we decide that Events should not be - -- pushed as JSONs, hopefully we think of the parsing side if/when - -- that happens. - Q.rejectEnv envelope False - Right notif -> - putMVar msgVar . Right $ - EventData notif envelope.envDeliveryTag + getEventData :: RabbitMqChannel -> IO EventData + getEventData chan = do + (msg, envelope) <- getMessage chan + case eitherDecode @QueuedNotification msg.msgBody of + Left err -> do + logParseError err + -- This message cannot be parsed, make sure it doesn't requeue. There + -- is no need to throw an error and kill the websocket as this is + -- probably caused by a bug or someone messing with RabbitMQ. + -- + -- The bug case is slightly dangerous as it could drop a lot of events + -- en masse, if at some point we decide that Events should not be + -- pushed as JSONs, hopefully we think of the parsing side if/when + -- that happens. + Q.rejectEnv envelope False + -- try again + getEventData chan + Right notif -> do + logEvent notif + pure $ EventData notif envelope.envDeliveryTag handleWebSocketExceptions wsConn = Handler $ @@ -173,45 +170,20 @@ rabbitMQWebSocketApp uid cid e pendingConn = do Log.msg (Log.val "Client sent unexpected ack message") . logClient WS.sendCloseCode wsConn 1003 ("unexpected-ack" :: ByteString) + sendNotifications :: WS.Connection -> - MVar (Either Q.AMQPException EventData) -> MVar (Either ConnectionException MessageClientToServer) -> IO () - sendNotifications wsConn msgVar wsVar = lowerCodensity $ do - -- create rabbitmq connection - conn <- Codensity $ withConnection e.logg e.rabbitmq - - -- Store it in the env - let key = mkKeyRabbit uid cid - D.insert key conn e.rabbitConnections - - -- create rabbitmq channel - amqpChan <- Codensity $ bracket (Q.openChannel conn) Q.closeChannel - - -- propagate rabbitmq connection failure - lift $ Q.addConnectionClosedHandler conn True $ do - void $ D.remove key e.rabbitConnections - putMVar msgVar $ - Left (Q.ConnectionClosedException Q.Normal "") - - -- register consumer that pushes rabbitmq messages into msgVar - void $ - Codensity $ - bracket - (createConsumer amqpChan msgVar) - (Q.cancelConsumer amqpChan) + sendNotifications wsConn wsVar = lowerCodensity $ do + chan <- lift $ createChannel e.pool (clientNotificationQueueName uid cid) - -- get data from msgVar and push to client let consumeRabbitMq = forever $ do - eventData' <- takeMVar msgVar - either throwIO pure eventData' >>= \eventData -> do - logEvent eventData.event - catch (WS.sendBinaryData wsConn (encode (EventMessage eventData))) $ - \(err :: SomeException) -> do - logSendFailure err - void $ D.remove key e.rabbitConnections - throwIO err + eventData <- getEventData chan + catch (WS.sendBinaryData wsConn (encode (EventMessage eventData))) $ + \(err :: SomeException) -> do + logSendFailure err + throwIO err -- get ack from wsVar and forward to rabbitmq let consumeWebsocket = forever $ do @@ -220,7 +192,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do AckFullSync -> throwIO UnexpectedAck AckMessage ackData -> do logAckReceived ackData - void $ Q.ackMsg amqpChan ackData.deliveryTag ackData.multiple + void $ ackMessage chan ackData.deliveryTag ackData.multiple -- run both loops concurrently, so that -- - notifications are delivered without having to wait for acks diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index 7596b6d2fab..e09cf7d80db 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -36,7 +36,8 @@ import Control.Concurrent.Async qualified as Async import Control.Exception qualified as E import Control.Exception.Safe (catchAny) import Control.Lens ((^.)) -import Control.Monad.Catch (MonadCatch, finally) +import Control.Monad.Catch (MonadCatch) +import Control.Monad.Codensity import Data.Metrics.Servant import Data.Proxy import Data.Text (pack, strip) @@ -67,26 +68,32 @@ import Wire.OpenTelemetry (withTracer) type CombinedAPI = CannonAPI :<|> Internal.API run :: Opts -> IO () -run o = withTracer \tracer -> do +run o = lowerCodensity $ do + tracer <- Codensity withTracer when (o ^. drainOpts . millisecondsBetweenBatches == 0) $ error "drainOpts.millisecondsBetweenBatches must not be set to 0." when (o ^. drainOpts . gracePeriodSeconds == 0) $ error "drainOpts.gracePeriodSeconds must not be set to 0." - ext <- loadExternal - g <- L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat) - cassandra <- defInitCassandra (o ^. cassandraOpts) g - e <- - mkEnv ext o cassandra g - <$> D.empty connectionLimit - <*> D.empty connectionLimit - <*> newManager defaultManagerSettings {managerConnCount = connectionLimit} - <*> createSystemRandom - <*> mkClock - <*> pure (o ^. Cannon.Options.rabbitmq) - refreshMetricsThread <- Async.async $ runCannon e refreshMetrics + ext <- lift loadExternal + g <- + Codensity $ + E.bracket + (L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat)) + L.close + cassandra <- lift $ defInitCassandra (o ^. cassandraOpts) g + + e <- do + d1 <- D.empty connectionLimit + d2 <- D.empty connectionLimit + man <- lift $ newManager defaultManagerSettings {managerConnCount = connectionLimit} + rnd <- lift createSystemRandom + clk <- lift mkClock + mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.rabbitmq) + + void $ Codensity $ Async.withAsync $ runCannon e refreshMetrics s <- newSettings $ Server (o ^. cannon . host) (o ^. cannon . port) (applog e) (Just idleTimeout) - otelMiddleWare <- newOpenTelemetryWaiMiddleware + otelMiddleWare <- lift newOpenTelemetryWaiMiddleware let middleware :: Wai.Middleware middleware = versionMiddleware (foldMap expandVersionExp (o ^. disabledAPIVersions)) @@ -101,17 +108,19 @@ run o = withTracer \tracer -> do server = hoistServer (Proxy @CannonAPI) (runCannonToServant e) publicAPIServer :<|> hoistServer (Proxy @Internal.API) (runCannonToServant e) internalServer - tid <- myThreadId - E.handle uncaughtExceptionHandler $ do - void $ installHandler sigTERM (signalHandler (env e) tid) Nothing - void $ installHandler sigINT (signalHandler (env e) tid) Nothing - inSpan tracer "cannon" defaultSpanArguments {kind = Otel.Server} (runSettings s app) `finally` do + tid <- lift myThreadId + + Codensity $ \k -> + inSpan tracer "cannon" defaultSpanArguments {kind = Otel.Server} (k ()) + lift $ + E.handle uncaughtExceptionHandler $ do + void $ installHandler sigTERM (signalHandler (env e) tid) Nothing + void $ installHandler sigINT (signalHandler (env e) tid) Nothing -- FUTUREWORK(@akshaymankar, @fisx): we may want to call `runSettingsWithShutdown` here, -- but it's a sensitive change, and it looks like this is closing all the websockets at -- the same time and then calling the drain script. I suspect this might be due to some -- cleanup in wai. this needs to be tested very carefully when touched. - Async.cancel refreshMetricsThread - L.close (applog e) + runSettings s app where idleTimeout = fromIntegral $ maxPingInterval + 3 -- Each cannon instance advertises its own location (ip or dns name) to gundeck. diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index ec6e24d729a..1112fe712af 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -34,12 +34,14 @@ import Bilge (Manager) import Bilge.RPC (HasRequestId (..)) import Cannon.Dict (Dict) import Cannon.Options +import Cannon.RabbitMq import Cannon.WS (Clock, Key, Websocket) import Cannon.WS qualified as WS import Cassandra (ClientState) import Control.Concurrent.Async (mapConcurrently) import Control.Lens ((^.)) import Control.Monad.Catch +import Control.Monad.Codensity import Data.Id import Data.Text.Encoding import Imports @@ -106,10 +108,31 @@ mkEnv :: GenIO -> Clock -> AmqpEndpoint -> - Env -mkEnv external o cs l d conns p g t rabbitmqOpts = - Env o l d conns (RequestId defRequestId) $ - WS.env external (o ^. cannon . port) (encodeUtf8 $ o ^. gundeck . host) (o ^. gundeck . port) l p d conns g t (o ^. drainOpts) rabbitmqOpts cs + Codensity IO Env +mkEnv external o cs l d conns p g t endpoint = do + let poolOpts = + RabbitMqPoolOptions + { endpoint = endpoint, + maxConnections = 10, -- TODO + maxChannels = 100 -- TODO + } + pool <- createRabbitMqPool poolOpts l + let wsEnv = + WS.env + external + (o ^. cannon . port) + (encodeUtf8 $ o ^. gundeck . host) + (o ^. gundeck . port) + l + p + d + conns + g + t + (o ^. drainOpts) + cs + pool + pure $ Env o l d conns (RequestId defRequestId) wsEnv runCannon :: Env -> Cannon a -> IO a runCannon e c = runReaderT (unCannon c) e diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index f868a033bb6..f5ba8cdf997 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -53,6 +53,7 @@ import Bilge.Retry import Cannon.Dict (Dict) import Cannon.Dict qualified as D import Cannon.Options (DrainOpts, gracePeriodSeconds, millisecondsBetweenBatches, minBatchSize) +import Cannon.RabbitMq import Cassandra (ClientState) import Conduit import Control.Concurrent.Timeout @@ -70,7 +71,6 @@ import Data.Text.Encoding (decodeUtf8) import Data.Timeout (TimeoutUnit (..), (#)) import Imports hiding (threadDelay) import Network.AMQP qualified as Q -import Network.AMQP.Extended import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Network.Wai.Utilities.Error @@ -154,8 +154,8 @@ data Env = Env rand :: !GenIO, clock :: !Clock, drainOpts :: DrainOpts, - rabbitmq :: !AmqpEndpoint, - cassandra :: ClientState + cassandra :: ClientState, + pool :: RabbitMqPool } setRequestId :: RequestId -> Env -> Env @@ -202,8 +202,8 @@ env :: GenIO -> Clock -> DrainOpts -> - AmqpEndpoint -> ClientState -> + RabbitMqPool -> Env env leh lp gh gp = Env leh lp (Bilge.host gh . Bilge.port gp $ empty) (RequestId defRequestId) From fc963601c9da77dc64440a58f7fce67ced42dbf5 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Thu, 7 Nov 2024 09:28:39 +0100 Subject: [PATCH 04/31] Add rabbitmq channel finaliser --- services/cannon/src/Cannon/RabbitMq.hs | 26 ++++++++++--------- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 2 +- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index f8eed7e7ab7..67c1fb21b9e 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -131,7 +131,9 @@ openConnection pool = do ) data RabbitMqChannel = RabbitMqChannel - { inner :: MVar Q.Channel, + { -- | The current channel. The var is empty while the channel is being + -- re-established. + inner :: MVar Q.Channel, msgVar :: MVar (Maybe (Q.Message, Q.Envelope)) } @@ -143,11 +145,13 @@ ackMessage chan deliveryTag multiple = do inner <- readMVar chan.inner Q.ackMsg inner deliveryTag multiple -createChannel :: RabbitMqPool -> Text -> IO RabbitMqChannel +createChannel :: RabbitMqPool -> Text -> Codensity IO RabbitMqChannel createChannel pool queue = do - closedVar <- newEmptyMVar - inner <- newEmptyMVar - msgVar <- newEmptyMVar + closedVar <- lift newEmptyMVar + inner <- lift newEmptyMVar + msgVar <- lift newEmptyMVar + + -- TODO: handle exceptions in the manager thread let manageChannel = do conn <- acquireConnection pool chan <- Q.openChannel conn.inner @@ -181,8 +185,10 @@ createChannel pool queue = do else putMVar msgVar Nothing -- TODO: leaking async - void . mask_ $ async manageChannel - pure RabbitMqChannel {inner = inner, msgVar = msgVar} + Codensity $ \k -> do + void . mask_ $ async manageChannel + let chan = RabbitMqChannel {inner = inner, msgVar = msgVar} + k chan `finally` (readMVar inner >>= Q.closeChannel) acquireConnection :: RabbitMqPool -> IO PooledConnection acquireConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do @@ -192,11 +198,7 @@ acquireConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ d when (pconn.numChannels >= pool.opts.maxChannels) $ throwE TooManyChannels - let pconn' = - pconn' - { numChannels = - max pool.opts.maxChannels (succ (numChannels pconn)) - } + let pconn' = pconn {numChannels = succ (numChannels pconn)} lift $ writeTVar pool.connections $! map (\c -> if c.connId == pconn'.connId then pconn' else c) conns diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 4c4d09ae529..15efe271d88 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -176,7 +176,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do MVar (Either ConnectionException MessageClientToServer) -> IO () sendNotifications wsConn wsVar = lowerCodensity $ do - chan <- lift $ createChannel e.pool (clientNotificationQueueName uid cid) + chan <- createChannel e.pool (clientNotificationQueueName uid cid) let consumeRabbitMq = forever $ do eventData <- getEventData chan From 1049f36f58de087ac74c7155a907dbfd820321ae Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Fri, 8 Nov 2024 13:25:24 +0100 Subject: [PATCH 05/31] Create connections when necessary --- services/cannon/src/Cannon/RabbitMq.hs | 53 +++++++++++++++++++------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 67c1fb21b9e..94d473f0f5e 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -62,12 +62,12 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy atomically $ (,) <$> newTVar 0 <*> newTVar [] let pool = RabbitMqPool {..} - -- create one connection - void $ createConnection pool + -- -- create one connection + -- void $ createConnection pool pure pool destroy pool = putMVar pool.deadVar () -createConnection :: RabbitMqPool -> IO Q.Connection +createConnection :: RabbitMqPool -> IO PooledConnection createConnection pool = mask_ $ do conn <- openConnection pool pconn <- atomically $ do @@ -99,7 +99,7 @@ createConnection pool = mask_ $ do filter (\c -> c.connId /= pconn.connId) conns Q.addConnectionClosedHandler conn True $ do putMVar closedVar () - pure conn + pure pconn openConnection :: RabbitMqPool -> IO Q.Connection openConnection pool = do @@ -191,18 +191,45 @@ createChannel pool queue = do k chan `finally` (readMVar inner >>= Q.closeChannel) acquireConnection :: RabbitMqPool -> IO PooledConnection -acquireConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do - conns <- lift $ readTVar pool.connections - let pconn = minimumOn (.numChannels) conns - -- TODO: spawn new connection if possible - when (pconn.numChannels >= pool.opts.maxChannels) $ - throwE TooManyChannels +acquireConnection pool = do + pconn <- + findConnection pool >>= \case + Nothing -> do + bracketOnError + ( do + conn <- createConnection pool + -- if we have too many connections at this point, give up + numConnections <- + atomically $ + length <$> readTVar pool.connections + when (numConnections > pool.opts.maxConnections) $ + throw TooManyChannels + pure conn + ) + (\conn -> Q.closeConnection conn.inner) + pure + Just conn -> pure conn - let pconn' = pconn {numChannels = succ (numChannels pconn)} - lift $ + atomically $ do + let pconn' = pconn {numChannels = succ (numChannels pconn)} + conns <- readTVar pool.connections writeTVar pool.connections $! map (\c -> if c.connId == pconn'.connId then pconn' else c) conns - pure pconn' + pure pconn' + +findConnection :: RabbitMqPool -> IO (Maybe PooledConnection) +findConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do + conns <- lift $ readTVar pool.connections + if null conns + then pure Nothing + else do + let pconn = minimumOn (.numChannels) conns + if pconn.numChannels >= pool.opts.maxChannels + then + if length conns >= pool.opts.maxChannels + then throwE TooManyChannels + else pure Nothing + else pure (Just pconn) releaseConnection :: RabbitMqPool -> PooledConnection -> IO () releaseConnection pool conn = atomically $ do From 32c6fcd68b083ab68eccc064fc92e0e0973683e4 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 11 Nov 2024 08:32:16 +0100 Subject: [PATCH 06/31] Add rabbitmq options to cannon --- services/cannon/src/Cannon/Options.hs | 8 +++++++- services/cannon/src/Cannon/Types.hs | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/services/cannon/src/Cannon/Options.hs b/services/cannon/src/Cannon/Options.hs index dad2ad51924..5206357aa08 100644 --- a/services/cannon/src/Cannon/Options.hs +++ b/services/cannon/src/Cannon/Options.hs @@ -32,6 +32,8 @@ module Cannon.Options drainOpts, rabbitmq, cassandraOpts, + rabbitMqMaxConnections, + rabbitMqMaxChannels, Opts, gracePeriodSeconds, millisecondsBetweenBatches, @@ -98,7 +100,9 @@ data Opts = Opts _optsLogFormat :: !(Maybe (Last LogFormat)), _optsDrainOpts :: DrainOpts, _optsDisabledAPIVersions :: !(Set VersionExp), - _optsCassandraOpts :: !CassandraOpts + _optsCassandraOpts :: !CassandraOpts, + _optsRabbitMqMaxConnections :: Int, + _optsRabbitMqMaxChannels :: Int } deriving (Show, Generic) @@ -116,3 +120,5 @@ instance FromJSON Opts where <*> o .: "drainOpts" <*> o .: "disabledAPIVersions" <*> o .: "cassandra" + <*> o .:? "rabbitMqMaxConnections" .!= 30 + <*> o .:? "rabbitMqMaxChannels" .!= 300 diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index 1112fe712af..2a246a49c68 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -113,8 +113,8 @@ mkEnv external o cs l d conns p g t endpoint = do let poolOpts = RabbitMqPoolOptions { endpoint = endpoint, - maxConnections = 10, -- TODO - maxChannels = 100 -- TODO + maxConnections = o ^. rabbitMqMaxConnections, + maxChannels = o ^. rabbitMqMaxChannels } pool <- createRabbitMqPool poolOpts l let wsEnv = From 44279aa1f5335284e55a1da83fdc0deb4196945c Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 18 Nov 2024 08:38:58 +0100 Subject: [PATCH 07/31] Use Codensity in event tests --- integration/test/Test/Events.hs | 76 ++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 29f6fd4f945..5786d5ccc67 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -6,6 +6,8 @@ import API.Common import API.Galley import API.Gundeck import qualified Control.Concurrent.Timeout as Timeout +import Control.Monad.Codensity +import Control.Monad.Trans.Class import Control.Retry import Data.ByteString.Conversion (toByteString') import qualified Data.Text as Text @@ -34,7 +36,7 @@ testConsumeEventsOneWebSocket = do client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 clientId <- objId client - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do deliveryTag <- assertEvent eventsChan $ \e -> do e %. "type" `shouldMatch` "event" e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" @@ -71,9 +73,11 @@ testConsumeEventsForDifferentUsers = do bobClient <- addClient bob def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 bobClientId <- objId bobClient - withEventsWebSockets [(alice, aliceClientId), (bob, bobClientId)] $ \[(aliceEventsChan, aliceAckChan), (bobEventsChan, bobAckChan)] -> do - assertClientAdd aliceClientId aliceEventsChan aliceAckChan - assertClientAdd bobClientId bobEventsChan bobAckChan + lowerCodensity $ do + (aliceEventsChan, aliceAckChan) <- createEventsWebSocket alice aliceClientId + (bobEventsChan, bobAckChan) <- createEventsWebSocket bob bobClientId + lift $ assertClientAdd aliceClientId aliceEventsChan aliceAckChan + lift $ assertClientAdd bobClientId bobEventsChan bobAckChan where assertClientAdd :: (HasCallStack) => String -> TChan Value -> TChan Value -> App () assertClientAdd clientId eventsChan ackChan = do @@ -108,7 +112,7 @@ testConsumeEventsWhileHavingLegacyClients = do oldNotif <- awaitMatch isUserClientAddNotif oldWS oldNotif %. "payload.0.client.id" `shouldMatch` newClientId - withEventsWebSocket alice newClientId $ \eventsChan _ -> + runCodensity (createEventsWebSocket alice newClientId) $ \(eventsChan, _) -> assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` newClientId @@ -126,20 +130,20 @@ testConsumeEventsAcks = do client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 clientId <- objId client - withEventsWebSocket alice clientId $ \eventsChan _ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _ackChan) -> do assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId -- without ack, we receive the same event again - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do deliveryTag <- assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId e %. "data.delivery_tag" sendAck ackChan deliveryTag False - withEventsWebSocket alice clientId $ \eventsChan _ -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do assertNoEvent eventsChan testConsumeEventsMultipleAcks :: (HasCallStack) => App () @@ -152,7 +156,7 @@ testConsumeEventsMultipleAcks = do handle <- randomHandle putHandle alice handle >>= assertSuccess - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId @@ -164,7 +168,7 @@ testConsumeEventsMultipleAcks = do sendAck ackChan deliveryTag True - withEventsWebSocket alice clientId $ \eventsChan _ -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do assertNoEvent eventsChan testConsumeEventsAckNewEventWithoutAckingOldOne :: (HasCallStack) => App () @@ -177,7 +181,7 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do handle <- randomHandle putHandle alice handle >>= assertSuccess - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId @@ -191,7 +195,7 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do sendAck ackChan deliveryTagHandleAdd False -- Expect client-add event to be delivered again. - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do deliveryTagClientAdd <- assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId @@ -199,7 +203,7 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do sendAck ackChan deliveryTagClientAdd False - withEventsWebSocket alice clientId $ \eventsChan _ -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do assertNoEvent eventsChan testEventsDeadLettered :: (HasCallStack) => App () @@ -219,7 +223,7 @@ testEventsDeadLettered = do handle1 <- randomHandle putHandle alice handle1 >>= assertSuccess - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do assertEvent eventsChan $ \e -> do e %. "type" `shouldMatch` "notifications.missed" @@ -246,7 +250,7 @@ testTransientEventsDoNotTriggerDeadLetters = do clientId <- objId client -- consume it - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "type" `shouldMatch` "event" @@ -261,7 +265,7 @@ testTransientEventsDoNotTriggerDeadLetters = do -- Typing status is transient, currently no one is listening. sendTypingStatus alice selfConvId "started" >>= assertSuccess - withEventsWebSocket alice clientId $ \eventsChan _ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _ackChan) -> do assertNoEvent eventsChan testTransientEvents :: (HasCallStack) => App () @@ -275,7 +279,7 @@ testTransientEvents = do -- indicators, so we don't have to create another conv. selfConvId <- objQidObject alice - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do consumeAllEvents eventsChan ackChan sendTypingStatus alice selfConvId "started" >>= assertSuccess assertEvent eventsChan $ \e -> do @@ -295,32 +299,34 @@ testTransientEvents = do -- We shouldn't see the stopped typing status because we were not connected to -- the websocket when it was sent. The other events should still show up in -- order. - withEventsWebSocket alice clientId $ \eventsChan ackChan -> do + runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do for_ [handle1, handle2] $ \handle -> assertEvent eventsChan $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.update" - e %. "data.event.payload.0.user.handle" `shouldMatch` handle - ackEvent ackChan e + e %. "data.event.payload.0.type" `shouldMatch` "conversation.typing" + e %. "data.event.payload.0.qualified_conversation" `shouldMatch` selfConvId + deliveryTag <- e %. "data.delivery_tag" + sendAck ackChan deliveryTag False assertNoEvent eventsChan ---------------------------------------------------------------------- -- helpers -withEventsWebSockets :: forall uid a. (HasCallStack, MakesValue uid) => [(uid, String)] -> ([(TChan Value, TChan Value)] -> App a) -> App a -withEventsWebSockets userClients k = go [] $ reverse userClients - where - go :: [(TChan Value, TChan Value)] -> [(uid, String)] -> App a - go chans [] = k chans - go chans ((uid, cid) : remaining) = - withEventsWebSocket uid cid $ \eventsChan ackChan -> - go ((eventsChan, ackChan) : chans) remaining - -withEventsWebSocket :: (HasCallStack, MakesValue uid) => uid -> String -> (TChan Value -> TChan Value -> App a) -> App a -withEventsWebSocket uid cid k = do - closeWS <- newEmptyMVar - bracket (setup closeWS) (\(_, _, wsThread) -> cancel wsThread) $ \(eventsChan, ackChan, wsThread) -> do - x <- k eventsChan ackChan +createEventsWebSocket :: + (HasCallStack, MakesValue uid) => + uid -> + String -> + Codensity App (TChan Value, TChan Value) +createEventsWebSocket uid cid = do + closeWS <- lift newEmptyMVar + (eventsChan, ackChan, wsThread) <- + Codensity + $ bracket + (setup closeWS) + (\(_, _, wsThread) -> cancel wsThread) + + Codensity $ \k -> do + x <- k (eventsChan, ackChan) -- Ensure all the acks are sent before closing the websocket isAckChanEmpty <- From 0316b09aaaa8c0b7bb682538289a04a4e71e1b39 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 18 Nov 2024 10:55:48 +0100 Subject: [PATCH 08/31] Refactor web socket test code --- integration/test/Test/Events.hs | 94 +++++++++++++++------------------ 1 file changed, 43 insertions(+), 51 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 5786d5ccc67..24f1474faae 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -317,13 +317,49 @@ createEventsWebSocket :: uid -> String -> Codensity App (TChan Value, TChan Value) -createEventsWebSocket uid cid = do - closeWS <- lift newEmptyMVar - (eventsChan, ackChan, wsThread) <- - Codensity - $ bracket - (setup closeWS) - (\(_, _, wsThread) -> cancel wsThread) +createEventsWebSocket user cid = do + closeWS <- liftIO newEmptyMVar + eventsChan <- liftIO newTChanIO + ackChan <- liftIO newTChanIO + serviceMap <- lift $ getServiceMap =<< objDomain user + uid <- lift $ objId =<< objQidObject user + let HostPort caHost caPort = serviceHostPort serviceMap Cannon + path = "/events?client=" <> cid + caHdrs = [(fromString "Z-User", toByteString' uid)] + app conn = + race_ + ( wsRead conn `catch` \(e :: WS.ConnectionException) -> + case e of + WS.CloseRequest {} -> pure () + _ -> throwIO e + ) + (wsWrite conn) + + wsRead conn = forever $ do + bs <- WS.receiveData conn + case decodeStrict' bs of + Just n -> atomically $ writeTChan eventsChan n + Nothing -> + error $ "Failed to decode events: " ++ show bs + + wsWrite conn = forever $ do + eitherAck <- race (readMVar closeWS) (atomically $ readTChan ackChan) + case eitherAck of + Left () -> WS.sendClose conn (Text.pack "") + Right ack -> WS.sendBinaryData conn (encode ack) + + wsThread <- Codensity $ \k -> do + withAsync + ( liftIO + $ WS.runClientWith + caHost + (fromIntegral caPort) + path + WS.defaultConnectionOptions + caHdrs + app + ) + k Codensity $ \k -> do x <- k (eventsChan, ackChan) @@ -345,12 +381,6 @@ createEventsWebSocket uid cid = do Just () -> pure () pure x - where - setup :: (HasCallStack) => MVar () -> App (TChan Value, TChan Value, Async ()) - setup closeWS = do - (eventsChan, ackChan) <- liftIO $ (,) <$> newTChanIO <*> newTChanIO - wsThread <- eventsWebSocket uid cid eventsChan ackChan closeWS - pure (eventsChan, ackChan, wsThread) sendMsg :: (HasCallStack) => TChan Value -> Value -> App () sendMsg eventsChan msg = liftIO $ atomically $ writeTChan eventsChan msg @@ -401,41 +431,3 @@ consumeAllEvents eventsChan ackChan = do Just e -> do ackEvent ackChan e consumeAllEvents eventsChan ackChan - -eventsWebSocket :: (MakesValue user) => user -> String -> TChan Value -> TChan Value -> MVar () -> App (Async ()) -eventsWebSocket user clientId eventsChan ackChan closeWS = do - serviceMap <- getServiceMap =<< objDomain user - uid <- objId =<< objQidObject user - let HostPort caHost caPort = serviceHostPort serviceMap Cannon - path = "/events?client=" <> clientId - caHdrs = [(fromString "Z-User", toByteString' uid)] - app conn = do - r <- - async $ wsRead conn `catch` \(e :: WS.ConnectionException) -> - case e of - WS.CloseRequest {} -> pure () - _ -> throwIO e - w <- async $ wsWrite conn - void $ waitAny [r, w] - - wsRead conn = forever $ do - bs <- WS.receiveData conn - case decodeStrict' bs of - Just n -> atomically $ writeTChan eventsChan n - Nothing -> - error $ "Failed to decode events: " ++ show bs - - wsWrite conn = forever $ do - eitherAck <- race (readMVar closeWS) (atomically $ readTChan ackChan) - case eitherAck of - Left () -> WS.sendClose conn (Text.pack "") - Right ack -> WS.sendBinaryData conn (encode ack) - liftIO - $ async - $ WS.runClientWith - caHost - (fromIntegral caPort) - path - WS.defaultConnectionOptions - caHdrs - app From 9cc5aefd72db944d18dd5b95ad78d33f9c5fcc8a Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 11 Nov 2024 09:18:32 +0100 Subject: [PATCH 09/31] Fix connection pool deadlock --- integration/test/Test/Events.hs | 28 ++++++++++ services/cannon/cannon.cabal | 1 + services/cannon/src/Cannon/RabbitMq.hs | 56 ++++++++++--------- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 17 +++++- 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 24f1474faae..9b363496b61 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -309,6 +309,34 @@ testTransientEvents = do assertNoEvent eventsChan +testChannelLimit :: (HasCallStack) => App () +testChannelLimit = withModifiedBackend + ( def + { cannonCfg = + setField "rabbitMqMaxChannels" (2 :: Int) + >=> setField "rabbitMqMaxConnections" (1 :: Int) + } + ) + $ \domain -> do + alice <- randomUser domain def + clients <- + replicateM 3 + $ addClient alice def {acapabilities = Just ["consumable-notifications"]} + >>= getJSON 201 + >>= (%. "id") + >>= asString + + lowerCodensity $ do + acks <- for clients $ \c -> do + (events0, ack0) <- createEventsWebSocket alice c + e <- Codensity $ \k -> assertEvent events0 k + lift $ do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` c + tag <- e %. "data.delivery_tag" + pure (sendAck ack0 tag False) + lift $ sequenceA_ acks + ---------------------------------------------------------------------- -- helpers diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index cefc2da8054..2e817ba9c6f 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -110,6 +110,7 @@ library , servant-conduit , servant-server , strict >=0.3.2 + , temporary , text >=1.1 , tinylog >=0.10 , transformers diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 94d473f0f5e..38169635710 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -151,44 +151,47 @@ createChannel pool queue = do inner <- lift newEmptyMVar msgVar <- lift newEmptyMVar - -- TODO: handle exceptions in the manager thread + let handleException e = do + retry <- case (Q.isNormalChannelClose e, fromException e) of + (True, _) -> do + Log.info pool.logger $ + Log.msg (Log.val "RabbitMQ channel is closed normally, not attempting to reopen channel") + pure False + (_, Just (Q.ConnectionClosedException {})) -> do + Log.info pool.logger $ + Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") + pure False -- TODO: change this to true? + _ -> do + logException pool.logger "RabbitMQ channel closed" e + pure True + putMVar closedVar retry + let manageChannel = do conn <- acquireConnection pool chan <- Q.openChannel conn.inner + Q.addChannelExceptionHandler chan handleException putMVar inner chan void $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do putMVar msgVar (Just (message, envelope)) - Q.addChannelExceptionHandler chan $ \e -> do - void $ takeMVar inner - releaseConnection pool conn + retry <- takeMVar closedVar + + -- TODO: releaseConnection - retry <- case (Q.isNormalChannelClose e, fromException e) of - (True, _) -> do - Log.info pool.logger $ - Log.msg (Log.val "RabbitMQ channel is closed normally, not attempting to reopen channel") - pure False - (_, Just (Q.ConnectionClosedException {})) -> do - Log.info pool.logger $ - Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") - pure False -- TODO: change this to true? - _ -> do - logException pool.logger "RabbitMQ channel closed" e - pure True + -- TODO: put this in a bracket + catch (Q.closeChannel chan) $ \(_ :: SomeException) -> pure () - putMVar closedVar retry + when retry manageChannel - retry <- takeMVar closedVar - if retry - then -- TODO: exponential backoff? - manageChannel - else putMVar msgVar Nothing + lift . void . mask_ $ + async $ + catch manageChannel handleException + `finally` putMVar msgVar Nothing - -- TODO: leaking async Codensity $ \k -> do - void . mask_ $ async manageChannel let chan = RabbitMqChannel {inner = inner, msgVar = msgVar} - k chan `finally` (readMVar inner >>= Q.closeChannel) + finally (k chan) $ + putMVar closedVar False acquireConnection :: RabbitMqPool -> IO PooledConnection acquireConnection pool = do @@ -219,6 +222,7 @@ acquireConnection pool = do findConnection :: RabbitMqPool -> IO (Maybe PooledConnection) findConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do + -- TODO: use MaybeT conns <- lift $ readTVar pool.connections if null conns then pure Nothing @@ -226,7 +230,7 @@ findConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do let pconn = minimumOn (.numChannels) conns if pconn.numChannels >= pool.opts.maxChannels then - if length conns >= pool.opts.maxChannels + if length conns >= pool.opts.maxConnections then throwE TooManyChannels else pure Nothing else pure (Just pconn) diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 15efe271d88..2ea537662c9 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -1,3 +1,4 @@ +{-# OPTIONS -Wwarn #-} {-# LANGUAGE RecordWildCards #-} module Cannon.RabbitMqConsumerApp where @@ -10,17 +11,21 @@ import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async import Control.Concurrent.Timeout -import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try) +import Control.Exception (Handler (..), bracket, catch, catches, evaluate, throwIO, try) import Control.Lens hiding ((#)) import Control.Monad.Codensity import Data.Aeson hiding (Key) import Data.Id import Data.List.Extra hiding (delete) import Data.Timeout (TimeoutUnit (..), (#)) +import Debug.Trace import Imports hiding (min, threadDelay) import Network.AMQP qualified as Q import Network.WebSockets import Network.WebSockets qualified as WS +import System.IO (hPutStrLn) +import System.IO.Temp +import System.IO.Unsafe import System.Logger qualified as Log import UnliftIO.Async (pooledMapConcurrentlyN_) import Wire.API.Event.WebSocketProtocol @@ -85,7 +90,8 @@ rabbitMQWebSocketApp uid cid e pendingConn = do sendNotifications wsConn wsVar ) `catches` [ handleClientMisbehaving wsConn, - handleWebSocketExceptions wsConn + handleWebSocketExceptions wsConn, + handleOtherExceptions wsConn ] where logClient = @@ -99,6 +105,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do -- start a reader thread for client messages -- this needs to run asynchronously in order to promptly react to -- client-side connection termination + -- TODO: read websocket directly instead of using an MVar? a <- async $ forever $ do catch ( do @@ -108,7 +115,6 @@ rabbitMQWebSocketApp uid cid e pendingConn = do $ \err -> putMVar wsVar (Left err) pure (wsConn, a) - -- this is only needed in case of asynchronous exceptions closeWebSocket (wsConn, a) = do cancel a logCloseWebsocket @@ -171,6 +177,11 @@ rabbitMQWebSocketApp uid cid e pendingConn = do . logClient WS.sendCloseCode wsConn 1003 ("unexpected-ack" :: ByteString) + handleOtherExceptions wsConn = Handler $ + \(err :: SomeException) -> do + WS.sendCloseCode wsConn 1003 ("internal-error" :: ByteString) + throwIO err + sendNotifications :: WS.Connection -> MVar (Either ConnectionException MessageClientToServer) -> From 3a187bf568d255ca5ad5c4f0f5f2301a68fcd531 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 18 Nov 2024 11:03:59 +0100 Subject: [PATCH 10/31] Replace TChan with MVar --- integration/test/Test/Events.hs | 79 +++++++++++++-------------------- 1 file changed, 30 insertions(+), 49 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 9b363496b61..948e69db337 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -16,7 +16,6 @@ import qualified Network.WebSockets as WS import Notifications import SetupHelpers import Testlib.Prelude hiding (assertNoEvent) -import Testlib.Printing import UnliftIO hiding (handle) -- FUTUREWORK: Investigate why these tests are failing without @@ -79,7 +78,7 @@ testConsumeEventsForDifferentUsers = do lift $ assertClientAdd aliceClientId aliceEventsChan aliceAckChan lift $ assertClientAdd bobClientId bobEventsChan bobAckChan where - assertClientAdd :: (HasCallStack) => String -> TChan Value -> TChan Value -> App () + assertClientAdd :: (HasCallStack) => String -> MVar Value -> MVar (Maybe Value) -> App () assertClientAdd clientId eventsChan ackChan = do deliveryTag <- assertEvent eventsChan $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" @@ -344,11 +343,10 @@ createEventsWebSocket :: (HasCallStack, MakesValue uid) => uid -> String -> - Codensity App (TChan Value, TChan Value) + Codensity App (MVar Value, MVar (Maybe Value)) createEventsWebSocket user cid = do - closeWS <- liftIO newEmptyMVar - eventsChan <- liftIO newTChanIO - ackChan <- liftIO newTChanIO + eventsChan <- liftIO newEmptyMVar + ackChan <- liftIO newEmptyMVar serviceMap <- lift $ getServiceMap =<< objDomain user uid <- lift $ objId =<< objQidObject user let HostPort caHost caPort = serviceHostPort serviceMap Cannon @@ -366,15 +364,15 @@ createEventsWebSocket user cid = do wsRead conn = forever $ do bs <- WS.receiveData conn case decodeStrict' bs of - Just n -> atomically $ writeTChan eventsChan n + Just n -> putMVar eventsChan n Nothing -> error $ "Failed to decode events: " ++ show bs wsWrite conn = forever $ do - eitherAck <- race (readMVar closeWS) (atomically $ readTChan ackChan) - case eitherAck of - Left () -> WS.sendClose conn (Text.pack "") - Right ack -> WS.sendBinaryData conn (encode ack) + mAck <- takeMVar ackChan + case mAck of + Nothing -> WS.sendClose conn (Text.pack "") + Just ack -> WS.sendBinaryData conn (encode ack) wsThread <- Codensity $ \k -> do withAsync @@ -389,43 +387,26 @@ createEventsWebSocket user cid = do ) k - Codensity $ \k -> do - x <- k (eventsChan, ackChan) + Codensity $ \k -> + k (eventsChan, ackChan) `finally` do + putMVar ackChan Nothing + liftIO $ wait wsThread - -- Ensure all the acks are sent before closing the websocket - isAckChanEmpty <- - retrying - (limitRetries 5 <> constantDelay 10_000) - (\_ isEmpty -> pure $ not isEmpty) - (\_ -> atomically $ isEmptyTChan ackChan) - unless isAckChanEmpty $ do - putStrLn $ colored yellow $ "The ack chan is not empty after 50ms, some acks may not make it to the server" - - void $ tryPutMVar closeWS () - - timeout 1_000_000 (wait wsThread) >>= \case - Nothing -> - putStrLn $ colored yellow $ "The websocket thread did not close after waiting for 1s" - Just () -> pure () - - pure x - -sendMsg :: (HasCallStack) => TChan Value -> Value -> App () -sendMsg eventsChan msg = liftIO $ atomically $ writeTChan eventsChan msg - -ackFullSync :: (HasCallStack) => TChan Value -> App () -ackFullSync ackChan = do - sendMsg ackChan - $ object ["type" .= "ack_full_sync"] +ackFullSync :: (HasCallStack) => MVar (Maybe Value) -> App () +ackFullSync ackChan = + putMVar ackChan + $ Just (object ["type" .= "ack_full_sync"]) -ackEvent :: (HasCallStack) => TChan Value -> Value -> App () +ackEvent :: (HasCallStack) => MVar (Maybe Value) -> Value -> App () ackEvent ackChan event = do deliveryTag <- event %. "data.delivery_tag" sendAck ackChan deliveryTag False -sendAck :: (HasCallStack) => TChan Value -> Value -> Bool -> App () -sendAck ackChan deliveryTag multiple = do - sendMsg ackChan +sendAck :: (HasCallStack) => MVar (Maybe Value) -> Value -> Bool -> App () +sendAck ackChan deliveryTag multiple = + do + putMVar ackChan + $ Just $ object [ "type" .= "ack", "data" @@ -435,26 +416,26 @@ sendAck ackChan deliveryTag multiple = do ] ] -assertEvent :: (HasCallStack) => TChan Value -> ((HasCallStack) => Value -> App a) -> App a +assertEvent :: (HasCallStack) => MVar Value -> ((HasCallStack) => Value -> App a) -> App a assertEvent eventsChan expectations = do - timeout 10_000_000 (atomically (readTChan eventsChan)) >>= \case - Nothing -> assertFailure "No event received for 10s" + timeout 10_000_000 (takeMVar eventsChan) >>= \case + Nothing -> assertFailure "No event received for 1s" Just e -> do pretty <- prettyJSON e addFailureContext ("event:\n" <> pretty) $ expectations e -assertNoEvent :: (HasCallStack) => TChan Value -> App () +assertNoEvent :: (HasCallStack) => MVar Value -> App () assertNoEvent eventsChan = do - timeout 1_000_000 (atomically (readTChan eventsChan)) >>= \case + timeout 1_000_000 (takeMVar eventsChan) >>= \case Nothing -> pure () Just e -> do eventJSON <- prettyJSON e assertFailure $ "Did not expect event: \n" <> eventJSON -consumeAllEvents :: TChan Value -> TChan Value -> App () +consumeAllEvents :: MVar Value -> MVar (Maybe Value) -> App () consumeAllEvents eventsChan ackChan = do - timeout 1_000_000 (atomically (readTChan eventsChan)) >>= \case + timeout 1_000_000 (takeMVar eventsChan) >>= \case Nothing -> pure () Just e -> do ackEvent ackChan e From 4fb3cd5e98974fa7378c937c122eaa5b98440ff0 Mon Sep 17 00:00:00 2001 From: Stefan Berthold Date: Mon, 18 Nov 2024 14:45:09 +0000 Subject: [PATCH 11/31] introduce EventWebSocket --- integration/test/Test/Events.hs | 184 +++++++++++++++++--------------- 1 file changed, 95 insertions(+), 89 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 948e69db337..517c0ab2dca 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -35,21 +35,21 @@ testConsumeEventsOneWebSocket = do client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 clientId <- objId client - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - deliveryTag <- assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTag <- assertEvent ws $ \e -> do e %. "type" `shouldMatch` "event" e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId e %. "data.delivery_tag" - assertNoEvent eventsChan + assertNoEvent ws - sendAck ackChan deliveryTag False - assertNoEvent eventsChan + sendAck ws deliveryTag False + assertNoEvent ws handle <- randomHandle putHandle alice handle >>= assertSuccess - assertEvent eventsChan $ \e -> do + assertEvent ws $ \e -> do e %. "type" `shouldMatch` "event" e %. "data.event.payload.0.type" `shouldMatch` "user.update" e %. "data.event.payload.0.user.handle" `shouldMatch` handle @@ -73,19 +73,19 @@ testConsumeEventsForDifferentUsers = do bobClientId <- objId bobClient lowerCodensity $ do - (aliceEventsChan, aliceAckChan) <- createEventsWebSocket alice aliceClientId - (bobEventsChan, bobAckChan) <- createEventsWebSocket bob bobClientId - lift $ assertClientAdd aliceClientId aliceEventsChan aliceAckChan - lift $ assertClientAdd bobClientId bobEventsChan bobAckChan + aliceWS <- createEventsWebSocket alice aliceClientId + bobWS <- createEventsWebSocket bob bobClientId + lift $ assertClientAdd aliceClientId aliceWS + lift $ assertClientAdd bobClientId bobWS where - assertClientAdd :: (HasCallStack) => String -> MVar Value -> MVar (Maybe Value) -> App () - assertClientAdd clientId eventsChan ackChan = do - deliveryTag <- assertEvent eventsChan $ \e -> do + assertClientAdd :: (HasCallStack) => String -> EventWebSocket -> App () + assertClientAdd clientId ws = do + deliveryTag <- assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId e %. "data.delivery_tag" - assertNoEvent eventsChan - sendAck ackChan deliveryTag False + assertNoEvent ws + sendAck ws deliveryTag False testConsumeEventsWhileHavingLegacyClients :: (HasCallStack) => App () testConsumeEventsWhileHavingLegacyClients = do @@ -111,8 +111,8 @@ testConsumeEventsWhileHavingLegacyClients = do oldNotif <- awaitMatch isUserClientAddNotif oldWS oldNotif %. "payload.0.client.id" `shouldMatch` newClientId - runCodensity (createEventsWebSocket alice newClientId) $ \(eventsChan, _) -> - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice newClientId) $ \ws -> + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` newClientId @@ -129,21 +129,21 @@ testConsumeEventsAcks = do client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 clientId <- objId client - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _ackChan) -> do - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId -- without ack, we receive the same event again - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - deliveryTag <- assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTag <- assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId e %. "data.delivery_tag" - sendAck ackChan deliveryTag False + sendAck ws deliveryTag False - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do - assertNoEvent eventsChan + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testConsumeEventsMultipleAcks :: (HasCallStack) => App () testConsumeEventsMultipleAcks = do @@ -155,20 +155,20 @@ testConsumeEventsMultipleAcks = do handle <- randomHandle putHandle alice handle >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId - deliveryTag <- assertEvent eventsChan $ \e -> do + deliveryTag <- assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.update" e %. "data.event.payload.0.user.handle" `shouldMatch` handle e %. "data.delivery_tag" - sendAck ackChan deliveryTag True + sendAck ws deliveryTag True - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do - assertNoEvent eventsChan + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testConsumeEventsAckNewEventWithoutAckingOldOne :: (HasCallStack) => App () testConsumeEventsAckNewEventWithoutAckingOldOne = do @@ -180,30 +180,30 @@ testConsumeEventsAckNewEventWithoutAckingOldOne = do handle <- randomHandle putHandle alice handle >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId - deliveryTagHandleAdd <- assertEvent eventsChan $ \e -> do + deliveryTagHandleAdd <- assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.update" e %. "data.event.payload.0.user.handle" `shouldMatch` handle e %. "data.delivery_tag" -- Only ack the handle add delivery tag - sendAck ackChan deliveryTagHandleAdd False + sendAck ws deliveryTagHandleAdd False -- Expect client-add event to be delivered again. - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - deliveryTagClientAdd <- assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTagClientAdd <- assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId e %. "data.delivery_tag" - sendAck ackChan deliveryTagClientAdd False + sendAck ws deliveryTagClientAdd False - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _) -> do - assertNoEvent eventsChan + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testEventsDeadLettered :: (HasCallStack) => App () testEventsDeadLettered = do @@ -222,22 +222,22 @@ testEventsDeadLettered = do handle1 <- randomHandle putHandle alice handle1 >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do e %. "type" `shouldMatch` "notifications.missed" -- Until we ack the full sync, we can't get new events - ackFullSync ackChan + ackFullSync ws -- withEventsWebSocket alice clientId $ \eventsChan ackChan -> do -- Now we can see the next event - assertEvent eventsChan $ \e -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.update" e %. "data.event.payload.0.user.handle" `shouldMatch` handle1 - ackEvent ackChan e + ackEvent ws e -- We've consumed the whole queue. - assertNoEvent eventsChan + assertNoEvent ws testTransientEventsDoNotTriggerDeadLetters :: (HasCallStack) => App () testTransientEventsDoNotTriggerDeadLetters = do @@ -249,14 +249,14 @@ testTransientEventsDoNotTriggerDeadLetters = do clientId <- objId client -- consume it - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - assertEvent eventsChan $ \e -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "type" `shouldMatch` "event" e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` clientId deliveryTag <- e %. "data.delivery_tag" - sendAck ackChan deliveryTag False + sendAck ws deliveryTag False -- Self conv ID is same as user's ID, we'll use this to send typing -- indicators, so we don't have to create another conv. @@ -264,8 +264,8 @@ testTransientEventsDoNotTriggerDeadLetters = do -- Typing status is transient, currently no one is listening. sendTypingStatus alice selfConvId "started" >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, _ackChan) -> do - assertNoEvent eventsChan + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testTransientEvents :: (HasCallStack) => App () testTransientEvents = do @@ -278,14 +278,14 @@ testTransientEvents = do -- indicators, so we don't have to create another conv. selfConvId <- objQidObject alice - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do - consumeAllEvents eventsChan ackChan + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + consumeAllEvents ws sendTypingStatus alice selfConvId "started" >>= assertSuccess - assertEvent eventsChan $ \e -> do + assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "conversation.typing" e %. "data.event.payload.0.qualified_conversation" `shouldMatch` selfConvId deliveryTag <- e %. "data.delivery_tag" - sendAck ackChan deliveryTag False + sendAck ws deliveryTag False handle1 <- randomHandle putHandle alice handle1 >>= assertSuccess @@ -298,15 +298,14 @@ testTransientEvents = do -- We shouldn't see the stopped typing status because we were not connected to -- the websocket when it was sent. The other events should still show up in -- order. - runCodensity (createEventsWebSocket alice clientId) $ \(eventsChan, ackChan) -> do + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do for_ [handle1, handle2] $ \handle -> - assertEvent eventsChan $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "conversation.typing" - e %. "data.event.payload.0.qualified_conversation" `shouldMatch` selfConvId - deliveryTag <- e %. "data.delivery_tag" - sendAck ackChan deliveryTag False + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + ackEvent ws e - assertNoEvent eventsChan + assertNoEvent ws testChannelLimit :: (HasCallStack) => App () testChannelLimit = withModifiedBackend @@ -327,23 +326,28 @@ testChannelLimit = withModifiedBackend lowerCodensity $ do acks <- for clients $ \c -> do - (events0, ack0) <- createEventsWebSocket alice c - e <- Codensity $ \k -> assertEvent events0 k + ws <- createEventsWebSocket alice c + e <- Codensity $ \k -> assertEvent ws k lift $ do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` c tag <- e %. "data.delivery_tag" - pure (sendAck ack0 tag False) + pure (sendAck ws tag False) lift $ sequenceA_ acks ---------------------------------------------------------------------- -- helpers +data EventWebSocket = EventWebSocket + { events :: MVar Value, + ack :: MVar (Maybe Value) + } + createEventsWebSocket :: (HasCallStack, MakesValue uid) => uid -> String -> - Codensity App (MVar Value, MVar (Maybe Value)) + Codensity App EventWebSocket createEventsWebSocket user cid = do eventsChan <- liftIO newEmptyMVar ackChan <- liftIO newEmptyMVar @@ -368,11 +372,13 @@ createEventsWebSocket user cid = do Nothing -> error $ "Failed to decode events: " ++ show bs - wsWrite conn = forever $ do + wsWrite conn = do mAck <- takeMVar ackChan case mAck of Nothing -> WS.sendClose conn (Text.pack "") - Just ack -> WS.sendBinaryData conn (encode ack) + Just ack -> + WS.sendBinaryData conn (encode ack) + >> wsWrite conn wsThread <- Codensity $ \k -> do withAsync @@ -388,24 +394,24 @@ createEventsWebSocket user cid = do k Codensity $ \k -> - k (eventsChan, ackChan) `finally` do + k (EventWebSocket eventsChan ackChan) `finally` do putMVar ackChan Nothing liftIO $ wait wsThread -ackFullSync :: (HasCallStack) => MVar (Maybe Value) -> App () -ackFullSync ackChan = - putMVar ackChan +ackFullSync :: (HasCallStack) => EventWebSocket -> App () +ackFullSync ws = + putMVar ws.ack $ Just (object ["type" .= "ack_full_sync"]) -ackEvent :: (HasCallStack) => MVar (Maybe Value) -> Value -> App () -ackEvent ackChan event = do +ackEvent :: (HasCallStack) => EventWebSocket -> Value -> App () +ackEvent ws event = do deliveryTag <- event %. "data.delivery_tag" - sendAck ackChan deliveryTag False + sendAck ws deliveryTag False -sendAck :: (HasCallStack) => MVar (Maybe Value) -> Value -> Bool -> App () -sendAck ackChan deliveryTag multiple = +sendAck :: (HasCallStack) => EventWebSocket -> Value -> Bool -> App () +sendAck ws deliveryTag multiple = do - putMVar ackChan + putMVar $ ws.ack $ Just $ object [ "type" .= "ack", @@ -416,27 +422,27 @@ sendAck ackChan deliveryTag multiple = ] ] -assertEvent :: (HasCallStack) => MVar Value -> ((HasCallStack) => Value -> App a) -> App a -assertEvent eventsChan expectations = do - timeout 10_000_000 (takeMVar eventsChan) >>= \case - Nothing -> assertFailure "No event received for 1s" +assertEvent :: (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a +assertEvent ws expectations = do + timeout 10_000_000 (takeMVar ws.events) >>= \case + Nothing -> assertFailure "No event received for 10s" Just e -> do pretty <- prettyJSON e addFailureContext ("event:\n" <> pretty) $ expectations e -assertNoEvent :: (HasCallStack) => MVar Value -> App () -assertNoEvent eventsChan = do - timeout 1_000_000 (takeMVar eventsChan) >>= \case +assertNoEvent :: (HasCallStack) => EventWebSocket -> App () +assertNoEvent ws = do + timeout 1_000_000 (takeMVar ws.events) >>= \case Nothing -> pure () Just e -> do eventJSON <- prettyJSON e assertFailure $ "Did not expect event: \n" <> eventJSON -consumeAllEvents :: MVar Value -> MVar (Maybe Value) -> App () -consumeAllEvents eventsChan ackChan = do - timeout 1_000_000 (takeMVar eventsChan) >>= \case +consumeAllEvents :: EventWebSocket -> App () +consumeAllEvents ws = do + timeout 1_000_000 (takeMVar ws.events) >>= \case Nothing -> pure () Just e -> do - ackEvent ackChan e - consumeAllEvents eventsChan ackChan + ackEvent ws e + consumeAllEvents ws From 517989d48cdeec549af63e8d212e5ec7d6efac0a Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Thu, 21 Nov 2024 16:14:47 +0100 Subject: [PATCH 12/31] Fix channel limit test --- integration/test/Test/Events.hs | 45 ++++++++++++++------------ services/cannon/src/Cannon/RabbitMq.hs | 3 +- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 517c0ab2dca..be6f840d9af 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -317,7 +317,7 @@ testChannelLimit = withModifiedBackend ) $ \domain -> do alice <- randomUser domain def - clients <- + (client0 : clients) <- replicateM 3 $ addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 @@ -325,21 +325,24 @@ testChannelLimit = withModifiedBackend >>= asString lowerCodensity $ do - acks <- for clients $ \c -> do + for_ clients $ \c -> do ws <- createEventsWebSocket alice c e <- Codensity $ \k -> assertEvent ws k lift $ do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` c - tag <- e %. "data.delivery_tag" - pure (sendAck ws tag False) - lift $ sequenceA_ acks + e %. "data.delivery_tag" + + -- the first client fails to connect because the server runs out of channels + do + ws <- createEventsWebSocket alice client0 + lift $ assertNoEvent ws ---------------------------------------------------------------------- -- helpers data EventWebSocket = EventWebSocket - { events :: MVar Value, + { events :: Chan (Either WS.ConnectionException Value), ack :: MVar (Maybe Value) } @@ -349,7 +352,7 @@ createEventsWebSocket :: String -> Codensity App EventWebSocket createEventsWebSocket user cid = do - eventsChan <- liftIO newEmptyMVar + eventsChan <- liftIO newChan ackChan <- liftIO newEmptyMVar serviceMap <- lift $ getServiceMap =<< objDomain user uid <- lift $ objId =<< objQidObject user @@ -358,17 +361,13 @@ createEventsWebSocket user cid = do caHdrs = [(fromString "Z-User", toByteString' uid)] app conn = race_ - ( wsRead conn `catch` \(e :: WS.ConnectionException) -> - case e of - WS.CloseRequest {} -> pure () - _ -> throwIO e - ) + (wsRead conn `catch` (writeChan eventsChan . Left)) (wsWrite conn) wsRead conn = forever $ do bs <- WS.receiveData conn case decodeStrict' bs of - Just n -> putMVar eventsChan n + Just n -> writeChan eventsChan (Right n) Nothing -> error $ "Failed to decode events: " ++ show bs @@ -424,25 +423,31 @@ sendAck ws deliveryTag multiple = assertEvent :: (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a assertEvent ws expectations = do - timeout 10_000_000 (takeMVar ws.events) >>= \case - Nothing -> assertFailure "No event received for 10s" - Just e -> do + timeout 10_000_000 (readChan ws.events) >>= \case + Nothing -> assertFailure "No event received for 1s" + Just (Left _) -> assertFailure "Websocket closed when waiting for more events" + Just (Right e) -> do pretty <- prettyJSON e addFailureContext ("event:\n" <> pretty) $ expectations e assertNoEvent :: (HasCallStack) => EventWebSocket -> App () assertNoEvent ws = do - timeout 1_000_000 (takeMVar ws.events) >>= \case + timeout 1_000_000 (readChan ws.events) >>= \case Nothing -> pure () - Just e -> do + Just (Left _) -> pure () + Just (Right e) -> do eventJSON <- prettyJSON e assertFailure $ "Did not expect event: \n" <> eventJSON consumeAllEvents :: EventWebSocket -> App () consumeAllEvents ws = do - timeout 1_000_000 (takeMVar ws.events) >>= \case + timeout 1_000_000 (readChan ws.events) >>= \case Nothing -> pure () - Just e -> do + Just (Left e) -> + assertFailure + $ "Websocket closed while consuming all events: " + <> displayException e + Just (Right e) -> do ackEvent ws e consumeAllEvents ws diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 38169635710..5ed8f72d48c 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -191,7 +191,8 @@ createChannel pool queue = do Codensity $ \k -> do let chan = RabbitMqChannel {inner = inner, msgVar = msgVar} finally (k chan) $ - putMVar closedVar False + -- TODO: this might cause a channel leak + tryPutMVar closedVar False acquireConnection :: RabbitMqPool -> IO PooledConnection acquireConnection pool = do From 1398d6beb88b3109784ac36687c0783e10b71428 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Wed, 20 Nov 2024 10:17:28 +0100 Subject: [PATCH 13/31] Fix potential channel leak --- services/cannon/src/Cannon/RabbitMq.hs | 38 ++++++++++---------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 5ed8f72d48c..c3b8513cb10 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -167,32 +167,24 @@ createChannel pool queue = do putMVar closedVar retry let manageChannel = do - conn <- acquireConnection pool - chan <- Q.openChannel conn.inner - Q.addChannelExceptionHandler chan handleException - putMVar inner chan - void $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do - putMVar msgVar (Just (message, envelope)) - - retry <- takeMVar closedVar - - -- TODO: releaseConnection - - -- TODO: put this in a bracket - catch (Q.closeChannel chan) $ \(_ :: SomeException) -> pure () + retry <- lowerCodensity $ do + conn <- Codensity $ bracket (acquireConnection pool) (releaseConnection pool) + chan <- Codensity $ bracket (Q.openChannel conn.inner) $ \c -> + catch (Q.closeChannel c) $ \(_ :: SomeException) -> pure () + liftIO $ Q.addChannelExceptionHandler chan handleException + putMVar inner chan + void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do + putMVar msgVar (Just (message, envelope)) + takeMVar closedVar when retry manageChannel - lift . void . mask_ $ - async $ - catch manageChannel handleException - `finally` putMVar msgVar Nothing - - Codensity $ \k -> do - let chan = RabbitMqChannel {inner = inner, msgVar = msgVar} - finally (k chan) $ - -- TODO: this might cause a channel leak - tryPutMVar closedVar False + void $ + Codensity $ + withAsync $ + catch manageChannel handleException + `finally` putMVar msgVar Nothing + pure RabbitMqChannel {inner = inner, msgVar = msgVar} acquireConnection :: RabbitMqPool -> IO PooledConnection acquireConnection pool = do From 0f0bc53226308d775e0f52cbbd79d0ab1af17fbc Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Thu, 21 Nov 2024 09:14:20 +0100 Subject: [PATCH 14/31] Minor cleanups --- services/cannon/src/Cannon/RabbitMq.hs | 33 ++++++++++++-------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index c3b8513cb10..f0a9cdafde1 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -17,6 +17,7 @@ import Control.Concurrent.Async import Control.Exception import Control.Monad.Codensity import Control.Monad.Trans.Except +import Control.Monad.Trans.Maybe import Control.Retry import Data.List.Extra import Imports @@ -41,7 +42,6 @@ data PooledConnection = PooledConnection data RabbitMqPool = RabbitMqPool { opts :: RabbitMqPoolOptions, nextId :: TVar Word64, - -- TODO: use a priority queue connections :: TVar [PooledConnection], logger :: Logger, deadVar :: MVar () @@ -62,8 +62,8 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy atomically $ (,) <$> newTVar 0 <*> newTVar [] let pool = RabbitMqPool {..} - -- -- create one connection - -- void $ createConnection pool + -- create one connection + void $ createConnection pool pure pool destroy pool = putMVar pool.deadVar () @@ -159,8 +159,8 @@ createChannel pool queue = do pure False (_, Just (Q.ConnectionClosedException {})) -> do Log.info pool.logger $ - Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") - pure False -- TODO: change this to true? + Log.msg (Log.val "RabbitMQ connection was closed unexpectedly") + pure True _ -> do logException pool.logger "RabbitMQ channel closed" e pure True @@ -214,19 +214,16 @@ acquireConnection pool = do pure pconn' findConnection :: RabbitMqPool -> IO (Maybe PooledConnection) -findConnection pool = (>>= either throwIO pure) . atomically . runExceptT $ do - -- TODO: use MaybeT - conns <- lift $ readTVar pool.connections - if null conns - then pure Nothing - else do - let pconn = minimumOn (.numChannels) conns - if pconn.numChannels >= pool.opts.maxChannels - then - if length conns >= pool.opts.maxConnections - then throwE TooManyChannels - else pure Nothing - else pure (Just pconn) +findConnection pool = (>>= either throwIO pure) . atomically . runExceptT . runMaybeT $ do + conns <- lift . lift $ readTVar pool.connections + guard (notNull conns) + + let pconn = minimumOn (.numChannels) conns + when (pconn.numChannels >= pool.opts.maxChannels) $ + if length conns >= pool.opts.maxConnections + then lift $ throwE TooManyChannels + else mzero + pure pconn releaseConnection :: RabbitMqPool -> PooledConnection -> IO () releaseConnection pool conn = atomically $ do From 80c8d3c99e6b0df3b53afee69591671ddd287437 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Thu, 21 Nov 2024 09:21:33 +0100 Subject: [PATCH 15/31] Remove websocket read thread --- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 51 ++++++------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 2ea537662c9..543f1f83573 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -84,10 +84,10 @@ rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp rabbitMQWebSocketApp uid cid e pendingConn = do wsVar <- newEmptyMVar - bracket (openWebSocket wsVar) closeWebSocket $ \(wsConn, _) -> + bracket openWebSocket closeWebSocket $ \wsConn -> ( do - sendFullSyncMessageIfNeeded wsVar wsConn uid cid e - sendNotifications wsConn wsVar + sendFullSyncMessageIfNeeded wsConn uid cid e + sendNotifications wsConn ) `catches` [ handleClientMisbehaving wsConn, handleWebSocketExceptions wsConn, @@ -98,25 +98,11 @@ rabbitMQWebSocketApp uid cid e pendingConn = do Log.field "user" (idToText uid) . Log.field "client" (clientToText cid) - openWebSocket wsVar = do - wsConn <- - acceptRequest pendingConn - `catch` rejectOnError pendingConn - -- start a reader thread for client messages - -- this needs to run asynchronously in order to promptly react to - -- client-side connection termination - -- TODO: read websocket directly instead of using an MVar? - a <- async $ forever $ do - catch - ( do - msg <- getClientMessage wsConn - putMVar wsVar (Right msg) - ) - $ \err -> putMVar wsVar (Left err) - pure (wsConn, a) + openWebSocket = + acceptRequest pendingConn + `catch` rejectOnError pendingConn - closeWebSocket (wsConn, a) = do - cancel a + closeWebSocket wsConn = do logCloseWebsocket -- ignore any exceptions when sending the close message void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString) @@ -182,11 +168,8 @@ rabbitMQWebSocketApp uid cid e pendingConn = do WS.sendCloseCode wsConn 1003 ("internal-error" :: ByteString) throwIO err - sendNotifications :: - WS.Connection -> - MVar (Either ConnectionException MessageClientToServer) -> - IO () - sendNotifications wsConn wsVar = lowerCodensity $ do + sendNotifications :: WS.Connection -> IO () + sendNotifications wsConn = lowerCodensity $ do chan <- createChannel e.pool (clientNotificationQueueName uid cid) let consumeRabbitMq = forever $ do @@ -196,10 +179,9 @@ rabbitMQWebSocketApp uid cid e pendingConn = do logSendFailure err throwIO err - -- get ack from wsVar and forward to rabbitmq + -- get ack from websocket and forward to rabbitmq let consumeWebsocket = forever $ do - v <- takeMVar wsVar - either throwIO pure v >>= \case + getClientMessage wsConn >>= \case AckFullSync -> throwIO UnexpectedAck AckMessage ackData -> do logAckReceived ackData @@ -248,16 +230,15 @@ rabbitMQWebSocketApp uid cid e pendingConn = do -- | Check if client has missed messages. If so, send a full synchronisation -- message and wait for the corresponding ack. sendFullSyncMessageIfNeeded :: - MVar (Either ConnectionException MessageClientToServer) -> WS.Connection -> UserId -> ClientId -> Env -> IO () -sendFullSyncMessageIfNeeded wsVar wsConn uid cid env = do +sendFullSyncMessageIfNeeded wsConn uid cid env = do row <- C.runClient env.cassandra do retry x5 $ query1 q (params LocalQuorum (uid, cid)) - for_ row $ \_ -> sendFullSyncMessage uid cid wsVar wsConn env + for_ row $ \_ -> sendFullSyncMessage uid cid wsConn env where q :: PrepQuery R (UserId, ClientId) (Identity (Maybe UserId)) q = @@ -268,15 +249,13 @@ sendFullSyncMessageIfNeeded wsVar wsConn uid cid env = do sendFullSyncMessage :: UserId -> ClientId -> - MVar (Either ConnectionException MessageClientToServer) -> WS.Connection -> Env -> IO () -sendFullSyncMessage uid cid wsVar wsConn env = do +sendFullSyncMessage uid cid wsConn env = do let event = encode EventFullSync WS.sendBinaryData wsConn event - res <- takeMVar wsVar >>= either throwIO pure - case res of + getClientMessage wsConn >>= \case AckMessage _ -> throwIO UnexpectedAck AckFullSync -> C.runClient env.cassandra do From 5a951e042fd0184223c08501ddd477d09e1b3ae6 Mon Sep 17 00:00:00 2001 From: Stefan Berthold Date: Thu, 21 Nov 2024 16:23:36 +0000 Subject: [PATCH 16/31] Catch AsyncCancelled when RabbitMQ channel is closed --- services/cannon/src/Cannon/RabbitMq.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index f0a9cdafde1..bdcc1d6fbbf 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -162,7 +162,8 @@ createChannel pool queue = do Log.msg (Log.val "RabbitMQ connection was closed unexpectedly") pure True _ -> do - logException pool.logger "RabbitMQ channel closed" e + unless (fromException e == Just AsyncCancelled) $ + logException pool.logger "RabbitMQ channel closed" e pure True putMVar closedVar retry From b3acc81f8b79f909c683da95068c4e7a85f511c4 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Fri, 22 Nov 2024 08:50:02 +0100 Subject: [PATCH 17/31] Remove more spurious logs --- .../BackendDeadUserNotificationWatcher.hs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/services/background-worker/src/Wire/BackendDeadUserNotificationWatcher.hs b/services/background-worker/src/Wire/BackendDeadUserNotificationWatcher.hs index ad8c3c38254..1d24ab05c6e 100644 --- a/services/background-worker/src/Wire/BackendDeadUserNotificationWatcher.hs +++ b/services/background-worker/src/Wire/BackendDeadUserNotificationWatcher.hs @@ -99,15 +99,25 @@ startWorker amqp = do -- If the mvar is filled with a connection, we know the connection itself is fine, -- so we only need to re-open the channel let openConnection connM = do + -- keep track of whether the connection is being closed normally + closingRef <- newIORef False + mConn <- lowerCodensity $ do conn <- case connM of Nothing -> do -- Open the rabbit mq connection - conn <- Codensity $ bracket (liftIO $ Q.openConnection'' connOpts) (liftIO . Q.closeConnection) + conn <- Codensity + $ bracket + (liftIO $ Q.openConnection'' connOpts) + $ \conn -> do + writeIORef closingRef True + liftIO $ Q.closeConnection conn -- We need to recover from connection closed by restarting it liftIO $ Q.addConnectionClosedHandler conn True do - Log.err env.logger $ - Log.msg (Log.val "BackendDeadUserNoticationWatcher: Connection closed.") + closing <- readIORef closingRef + unless closing $ do + Log.err env.logger $ + Log.msg (Log.val "BackendDeadUserNoticationWatcher: Connection closed.") putMVar mVar Nothing runAppT env $ markAsNotWorking BackendDeadUserNoticationWatcher pure conn @@ -118,9 +128,10 @@ startWorker amqp = do -- If the channel stops, we need to re-open liftIO $ Q.addChannelExceptionHandler chan $ \e -> do - Log.err env.logger $ - Log.msg (Log.val "BackendDeadUserNoticationWatcher: Caught exception in RabbitMQ channel.") - . Log.field "exception" (displayException e) + unless (Q.isNormalChannelClose e) $ + Log.err env.logger $ + Log.msg (Log.val "BackendDeadUserNoticationWatcher: Caught exception in RabbitMQ channel.") + . Log.field "exception" (displayException e) runAppT env $ markAsNotWorking BackendDeadUserNoticationWatcher putMVar mVar (Just conn) From 8a1a30e2b97c2b396bf9aef8776307890f47db10 Mon Sep 17 00:00:00 2001 From: Stefan Berthold Date: Fri, 22 Nov 2024 15:12:59 +0000 Subject: [PATCH 18/31] Keep track of channels in ConnectionPool --- services/cannon/cannon.cabal | 2 +- services/cannon/default.nix | 1 + services/cannon/src/Cannon/RabbitMq.hs | 100 +++++++++--------- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 11 +- services/cannon/src/Cannon/Types.hs | 2 +- services/cannon/src/Cannon/WS.hs | 6 +- 6 files changed, 60 insertions(+), 62 deletions(-) diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index 2e817ba9c6f..a2d159a7041 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -90,6 +90,7 @@ library , bytestring-conversion >=0.2 , cassandra-util , conduit >=1.3.4.2 + , containers , data-timeout >=0.3 , exceptions >=0.6 , extended @@ -110,7 +111,6 @@ library , servant-conduit , servant-server , strict >=0.3.2 - , temporary , text >=1.1 , tinylog >=0.10 , transformers diff --git a/services/cannon/default.nix b/services/cannon/default.nix index 80ad8b8e3ca..c074a02a1ef 100644 --- a/services/cannon/default.nix +++ b/services/cannon/default.nix @@ -13,6 +13,7 @@ , bytestring-conversion , cassandra-util , conduit +, containers , criterion , data-timeout , exceptions diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index bdcc1d6fbbf..bae14989d22 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -20,6 +20,7 @@ import Control.Monad.Trans.Except import Control.Monad.Trans.Maybe import Control.Retry import Data.List.Extra +import Data.Map qualified as Map import Imports import Network.AMQP qualified as Q import Network.AMQP.Extended @@ -33,16 +34,16 @@ data RabbitMqPoolException instance Exception RabbitMqPoolException -data PooledConnection = PooledConnection +data PooledConnection key = PooledConnection { connId :: Word64, inner :: Q.Connection, - numChannels :: !Int + channels :: !(Map key Q.Channel) } -data RabbitMqPool = RabbitMqPool +data RabbitMqPool key = RabbitMqPool { opts :: RabbitMqPoolOptions, nextId :: TVar Word64, - connections :: TVar [PooledConnection], + connections :: TVar [PooledConnection key], logger :: Logger, deadVar :: MVar () } @@ -53,7 +54,7 @@ data RabbitMqPoolOptions = RabbitMqPoolOptions endpoint :: AmqpEndpoint } -createRabbitMqPool :: RabbitMqPoolOptions -> Logger -> Codensity IO RabbitMqPool +createRabbitMqPool :: (Ord key) => RabbitMqPoolOptions -> Logger -> Codensity IO (RabbitMqPool key) createRabbitMqPool opts logger = Codensity $ bracket create destroy where create = do @@ -67,7 +68,7 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy pure pool destroy pool = putMVar pool.deadVar () -createConnection :: RabbitMqPool -> IO PooledConnection +createConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key) createConnection pool = mask_ $ do conn <- openConnection pool pconn <- atomically $ do @@ -76,7 +77,7 @@ createConnection pool = mask_ $ do let c = PooledConnection { connId = connId, - numChannels = 0, + channels = mempty, inner = conn } modifyTVar pool.connections (c :) @@ -101,7 +102,7 @@ createConnection pool = mask_ $ do putMVar closedVar () pure pconn -openConnection :: RabbitMqPool -> IO Q.Connection +openConnection :: RabbitMqPool key -> IO Q.Connection openConnection pool = do (username, password) <- readCredsFromEnv recovering @@ -145,8 +146,8 @@ ackMessage chan deliveryTag multiple = do inner <- readMVar chan.inner Q.ackMsg inner deliveryTag multiple -createChannel :: RabbitMqPool -> Text -> Codensity IO RabbitMqChannel -createChannel pool queue = do +createChannel :: (Ord key) => RabbitMqPool key -> Text -> key -> Codensity IO RabbitMqChannel +createChannel pool queue key = do closedVar <- lift newEmptyMVar inner <- lift newEmptyMVar msgVar <- lift newEmptyMVar @@ -169,14 +170,23 @@ createChannel pool queue = do let manageChannel = do retry <- lowerCodensity $ do - conn <- Codensity $ bracket (acquireConnection pool) (releaseConnection pool) + conn <- Codensity $ bracket (acquireConnection pool) (releaseConnection pool key) chan <- Codensity $ bracket (Q.openChannel conn.inner) $ \c -> catch (Q.closeChannel c) $ \(_ :: SomeException) -> pure () - liftIO $ Q.addChannelExceptionHandler chan handleException - putMVar inner chan - void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do - putMVar msgVar (Just (message, envelope)) - takeMVar closedVar + connSize <- atomically $ do + let conn' = conn {channels = Map.insert key chan conn.channels} + conns <- readTVar pool.connections + writeTVar pool.connections $! + map (\c -> if c.connId == conn'.connId then conn' else c) conns + pure $ Map.size conn'.channels + if connSize > pool.opts.maxChannels + then pure True + else do + liftIO $ Q.addChannelExceptionHandler chan handleException + putMVar inner chan + void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do + putMVar msgVar (Just (message, envelope)) + takeMVar closedVar when retry manageChannel @@ -187,50 +197,42 @@ createChannel pool queue = do `finally` putMVar msgVar Nothing pure RabbitMqChannel {inner = inner, msgVar = msgVar} -acquireConnection :: RabbitMqPool -> IO PooledConnection +acquireConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key) acquireConnection pool = do - pconn <- - findConnection pool >>= \case - Nothing -> do - bracketOnError - ( do - conn <- createConnection pool - -- if we have too many connections at this point, give up - numConnections <- - atomically $ - length <$> readTVar pool.connections - when (numConnections > pool.opts.maxConnections) $ - throw TooManyChannels - pure conn - ) - (\conn -> Q.closeConnection conn.inner) - pure - Just conn -> pure conn - - atomically $ do - let pconn' = pconn {numChannels = succ (numChannels pconn)} - conns <- readTVar pool.connections - writeTVar pool.connections $! - map (\c -> if c.connId == pconn'.connId then pconn' else c) conns - pure pconn' - -findConnection :: RabbitMqPool -> IO (Maybe PooledConnection) -findConnection pool = (>>= either throwIO pure) . atomically . runExceptT . runMaybeT $ do + findConnection pool >>= \case + Nothing -> do + bracketOnError + ( do + conn <- createConnection pool + -- if we have too many connections at this point, give up + numConnections <- -- TODO should be moved to the body + atomically $ + length <$> readTVar pool.connections + when (numConnections > pool.opts.maxConnections) $ + throw TooManyChannels + pure conn + ) + (\conn -> Q.closeConnection conn.inner) + pure + Just conn -> pure conn + +findConnection :: RabbitMqPool key -> IO (Maybe (PooledConnection key)) +findConnection pool = (either throwIO pure <=< (atomically . runExceptT . runMaybeT)) $ do conns <- lift . lift $ readTVar pool.connections guard (notNull conns) - let pconn = minimumOn (.numChannels) conns - when (pconn.numChannels >= pool.opts.maxChannels) $ + let pconn = minimumOn (Map.size . (.channels)) $ conns + when (Map.size pconn.channels >= pool.opts.maxChannels) $ if length conns >= pool.opts.maxConnections then lift $ throwE TooManyChannels else mzero pure pconn -releaseConnection :: RabbitMqPool -> PooledConnection -> IO () -releaseConnection pool conn = atomically $ do +releaseConnection :: (Ord key) => RabbitMqPool key -> key -> PooledConnection key -> IO () +releaseConnection pool key conn = atomically $ do modifyTVar pool.connections $ map $ \c -> if c.connId == conn.connId - then c {numChannels = pred (numChannels c)} + then c {channels = Map.delete key c.channels} else c logConnectionError :: Logger -> Bool -> SomeException -> RetryStatus -> IO () diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 543f1f83573..265cd2b7d27 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -11,21 +11,17 @@ import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async import Control.Concurrent.Timeout -import Control.Exception (Handler (..), bracket, catch, catches, evaluate, throwIO, try) +import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try) import Control.Lens hiding ((#)) import Control.Monad.Codensity import Data.Aeson hiding (Key) import Data.Id import Data.List.Extra hiding (delete) import Data.Timeout (TimeoutUnit (..), (#)) -import Debug.Trace import Imports hiding (min, threadDelay) import Network.AMQP qualified as Q import Network.WebSockets import Network.WebSockets qualified as WS -import System.IO (hPutStrLn) -import System.IO.Temp -import System.IO.Unsafe import System.Logger qualified as Log import UnliftIO.Async (pooledMapConcurrentlyN_) import Wire.API.Event.WebSocketProtocol @@ -82,8 +78,6 @@ drainRabbitQueues e = do rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp rabbitMQWebSocketApp uid cid e pendingConn = do - wsVar <- newEmptyMVar - bracket openWebSocket closeWebSocket $ \wsConn -> ( do sendFullSyncMessageIfNeeded wsConn uid cid e @@ -170,7 +164,8 @@ rabbitMQWebSocketApp uid cid e pendingConn = do sendNotifications :: WS.Connection -> IO () sendNotifications wsConn = lowerCodensity $ do - chan <- createChannel e.pool (clientNotificationQueueName uid cid) + let key = mkKeyRabbit uid cid + chan <- createChannel e.pool (clientNotificationQueueName uid cid) key let consumeRabbitMq = forever $ do eventData <- getEventData chan diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index 2a246a49c68..0ac427ad1fa 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -53,7 +53,7 @@ import System.Logger qualified as Logger import System.Logger.Class hiding (info) import System.Random.MWC (GenIO) -connectionLimit :: Int +connectionLimit :: Int -- TODO rename to max number of buckets in Dict connectionLimit = 128 ----------------------------------------------------------------------------- diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index f5ba8cdf997..146759177af 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -87,7 +87,7 @@ import Wire.API.Presence newtype Key = Key { _key :: (ByteString, ByteString) } - deriving (Eq, Show, Hashable) + deriving (Eq, Show, Hashable, Ord) mkKey :: UserId -> ConnId -> Key mkKey u c = Key (toByteString' u, fromConnId c) @@ -155,7 +155,7 @@ data Env = Env clock :: !Clock, drainOpts :: DrainOpts, cassandra :: ClientState, - pool :: RabbitMqPool + pool :: RabbitMqPool Key } setRequestId :: RequestId -> Env -> Env @@ -203,7 +203,7 @@ env :: Clock -> DrainOpts -> ClientState -> - RabbitMqPool -> + RabbitMqPool Key -> Env env leh lp gh gp = Env leh lp (Bilge.host gh . Bilge.port gp $ empty) (RequestId defRequestId) From 932f0ba6cd799487820e66178980ece4531c8013 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Fri, 22 Nov 2024 16:32:23 +0100 Subject: [PATCH 19/31] Implement connection pool draining --- services/cannon/cannon.cabal | 1 + services/cannon/src/Cannon/RabbitMq.hs | 81 +++++++++++++++++-- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 56 ------------- services/cannon/src/Cannon/Run.hs | 13 +-- services/cannon/src/Cannon/WS.hs | 4 + 5 files changed, 88 insertions(+), 67 deletions(-) diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index a2d159a7041..25ad0624593 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -86,6 +86,7 @@ library , async >=2.0 , base >=4.6 && <5 , bilge >=0.12 + , binary , bytestring >=0.10 , bytestring-conversion >=0.2 , cassandra-util diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index bae14989d22..30f2fa5e387 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -1,4 +1,3 @@ -{-# OPTIONS -Wwarn #-} {-# LANGUAGE RecordWildCards #-} module Cannon.RabbitMq @@ -6,6 +5,7 @@ module Cannon.RabbitMq RabbitMqPoolOptions (..), RabbitMqPool, createRabbitMqPool, + drainRabbitMqPool, RabbitMqChannel (..), createChannel, getMessage, @@ -13,19 +13,25 @@ module Cannon.RabbitMq ) where +import Cannon.Options import Control.Concurrent.Async +import Control.Concurrent.Timeout import Control.Exception +import Control.Lens ((^.)) import Control.Monad.Codensity import Control.Monad.Trans.Except import Control.Monad.Trans.Maybe import Control.Retry +import Data.ByteString.Conversion import Data.List.Extra import Data.Map qualified as Map -import Imports +import Data.Timeout +import Imports hiding (threadDelay) import Network.AMQP qualified as Q import Network.AMQP.Extended import System.Logger (Logger) import System.Logger qualified as Log +import UnliftIO (pooledMapConcurrentlyN_) data RabbitMqPoolException = TooManyChannels @@ -44,6 +50,8 @@ data RabbitMqPool key = RabbitMqPool { opts :: RabbitMqPoolOptions, nextId :: TVar Word64, connections :: TVar [PooledConnection key], + -- | draining mode + draining :: TVar Bool, logger :: Logger, deadVar :: MVar () } @@ -59,19 +67,81 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy where create = do deadVar <- newEmptyMVar - (nextId, connections) <- + (nextId, connections, draining) <- atomically $ - (,) <$> newTVar 0 <*> newTVar [] + (,,) <$> newTVar 0 <*> newTVar [] <*> newTVar False let pool = RabbitMqPool {..} -- create one connection void $ createConnection pool pure pool destroy pool = putMVar pool.deadVar () +drainRabbitMqPool :: (ToByteString key) => RabbitMqPool key -> DrainOpts -> IO () +drainRabbitMqPool pool opts = do + atomically $ writeTVar pool.draining True + + channels :: [(key, Q.Channel)] <- atomically $ do + conns <- readTVar pool.connections + pure $ concat [Map.assocs c.channels | c <- conns] + let numberOfChannels = fromIntegral (length channels) + + let maxNumberOfBatches = + (opts ^. gracePeriodSeconds * 1000) + `div` (opts ^. millisecondsBetweenBatches) + computedBatchSize = numberOfChannels `div` maxNumberOfBatches + batchSize = max (opts ^. minBatchSize) computedBatchSize + + logDraining + pool.logger + numberOfChannels + batchSize + (opts ^. minBatchSize) + computedBatchSize + maxNumberOfBatches + + -- Sleep for the grace period + 1 second. If the sleep completes, it means + -- that draining didn't finish, and we should log that. + withAsync + ( do + -- Allocate 1 second more than the grace period to allow for overhead of + -- spawning threads. + liftIO $ threadDelay $ ((opts ^. gracePeriodSeconds) # Second + 1 # Second) + logExpired pool.logger (opts ^. gracePeriodSeconds) + ) + $ \_ -> do + for_ (chunksOf (fromIntegral batchSize) channels) $ \batch -> do + -- 16 was chosen with a roll of a fair dice. + void . async $ pooledMapConcurrentlyN_ 16 (closeChannel pool.logger) batch + liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond) + Log.info pool.logger $ Log.msg (Log.val "Draining complete") + where + closeChannel :: (ToByteString key) => Log.Logger -> (key, Q.Channel) -> IO () + closeChannel l (key, chan) = do + Log.info l $ + Log.msg (Log.val "closing rabbitmq channel") + . Log.field "key" (toByteString' key) + Q.closeChannel chan + + logExpired :: Log.Logger -> Word64 -> IO () + logExpired l period = do + Log.err l $ Log.msg (Log.val "Drain grace period expired") . Log.field "gracePeriodSeconds" period + + logDraining :: Log.Logger -> Word64 -> Word64 -> Word64 -> Word64 -> Word64 -> IO () + logDraining l count b minB batchSize m = do + Log.info l $ + Log.msg (Log.val "draining all rabbitmq channels") + . Log.field "numberOfChannels" count + . Log.field "computedBatchSize" b + . Log.field "minBatchSize" minB + . Log.field "batchSize" batchSize + . Log.field "maxNumberOfBatches" m + createConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key) createConnection pool = mask_ $ do conn <- openConnection pool - pconn <- atomically $ do + mpconn <- runMaybeT . atomically $ do + -- do not create new connections when in draining mode + readTVar pool.draining >>= guard . not connId <- readTVar pool.nextId writeTVar pool.nextId $! succ connId let c = @@ -82,6 +152,7 @@ createConnection pool = mask_ $ do } modifyTVar pool.connections (c :) pure c + pconn <- maybe (throwIO TooManyChannels) pure mpconn closedVar <- newEmptyMVar -- Fire and forget: the thread will terminate by itself as soon as the diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 265cd2b7d27..ede22279071 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -1,81 +1,25 @@ -{-# OPTIONS -Wwarn #-} {-# LANGUAGE RecordWildCards #-} module Cannon.RabbitMqConsumerApp where import Cannon.App (rejectOnError) -import Cannon.Dict qualified as D -import Cannon.Options import Cannon.RabbitMq import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async -import Control.Concurrent.Timeout import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try) import Control.Lens hiding ((#)) import Control.Monad.Codensity import Data.Aeson hiding (Key) import Data.Id -import Data.List.Extra hiding (delete) -import Data.Timeout (TimeoutUnit (..), (#)) import Imports hiding (min, threadDelay) import Network.AMQP qualified as Q import Network.WebSockets import Network.WebSockets qualified as WS import System.Logger qualified as Log -import UnliftIO.Async (pooledMapConcurrentlyN_) import Wire.API.Event.WebSocketProtocol import Wire.API.Notification -drainRabbitQueues :: Env -> IO () -drainRabbitQueues e = do - conns <- D.toList e.rabbitConnections - numberOfConns <- fromIntegral <$> D.size e.rabbitConnections - - let opts = e.drainOpts - maxNumberOfBatches = (opts ^. gracePeriodSeconds * 1000) `div` (opts ^. millisecondsBetweenBatches) - computedBatchSize = numberOfConns `div` maxNumberOfBatches - batchSize = max (opts ^. minBatchSize) computedBatchSize - - logDraining e.logg numberOfConns batchSize (opts ^. minBatchSize) computedBatchSize maxNumberOfBatches - - -- Sleeps for the grace period + 1 second. If the sleep completes, it means - -- that draining didn't finish, and we should log that. - timeoutAction <- async $ do - -- Allocate 1 second more than the grace period to allow for overhead of - -- spawning threads. - liftIO $ threadDelay $ ((opts ^. gracePeriodSeconds) # Second + 1 # Second) - logExpired e.logg (opts ^. gracePeriodSeconds) - - for_ (chunksOf (fromIntegral batchSize) conns) $ \batch -> do - -- 16 was chosen with a roll of a fair dice. - void . async $ pooledMapConcurrentlyN_ 16 (uncurry (closeConn e.logg)) batch - liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond) - cancel timeoutAction - Log.info e.logg $ Log.msg (Log.val "Draining complete") - where - closeConn :: Log.Logger -> Key -> Q.Connection -> IO () - closeConn l key conn = do - Log.info l $ - Log.msg (Log.val "closing rabbitmq connection") - . Log.field "key" (show key) - Q.closeConnection conn - void $ D.remove key e.rabbitConnections - - logExpired :: Log.Logger -> Word64 -> IO () - logExpired l period = do - Log.err l $ Log.msg (Log.val "Drain grace period expired") . Log.field "gracePeriodSeconds" period - - logDraining :: Log.Logger -> Word64 -> Word64 -> Word64 -> Word64 -> Word64 -> IO () - logDraining l count b min batchSize m = do - Log.info l $ - Log.msg (Log.val "draining all rabbitmq connections") - . Log.field "numberOfConns" count - . Log.field "computedBatchSize" b - . Log.field "minBatchSize" min - . Log.field "batchSize" batchSize - . Log.field "maxNumberOfBatches" m - rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp rabbitMQWebSocketApp uid cid e pendingConn = do bracket openWebSocket closeWebSocket $ \wsConn -> diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index e09cf7d80db..81a4c420d6a 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -27,7 +27,7 @@ import Cannon.API.Public import Cannon.App (maxPingInterval) import Cannon.Dict qualified as D import Cannon.Options -import Cannon.RabbitMqConsumerApp (drainRabbitQueues) +import Cannon.RabbitMq import Cannon.Types (Cannon, applog, clients, connectionLimit, env, mkEnv, runCannon, runCannonToServant) import Cannon.WS hiding (drainOpts, env) import Cassandra.Util (defInitCassandra) @@ -114,8 +114,9 @@ run o = lowerCodensity $ do inSpan tracer "cannon" defaultSpanArguments {kind = Otel.Server} (k ()) lift $ E.handle uncaughtExceptionHandler $ do - void $ installHandler sigTERM (signalHandler (env e) tid) Nothing - void $ installHandler sigINT (signalHandler (env e) tid) Nothing + let handler = signalHandler (env e) (o ^. drainOpts) tid + void $ installHandler sigTERM handler Nothing + void $ installHandler sigINT handler Nothing -- FUTUREWORK(@akshaymankar, @fisx): we may want to call `runSettingsWithShutdown` here, -- but it's a sensitive change, and it looks like this is closing all the websockets at -- the same time and then calling the drain script. I suspect this might be due to some @@ -132,10 +133,10 @@ run o = lowerCodensity $ do readExternal :: FilePath -> IO ByteString readExternal f = encodeUtf8 . strip . pack <$> Strict.readFile f -signalHandler :: Env -> ThreadId -> Signals.Handler -signalHandler e mainThread = CatchOnce $ do +signalHandler :: Env -> DrainOpts -> ThreadId -> Signals.Handler +signalHandler e opts mainThread = CatchOnce $ do runWS e drain - drainRabbitQueues e + drainRabbitMqPool e.pool opts throwTo mainThread SignalledToExit -- | This is called when the main thread receives the exception generated by diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index 146759177af..1653c82fbd9 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -61,6 +61,7 @@ import Control.Lens ((^.)) import Control.Monad.Catch import Control.Retry import Data.Aeson hiding (Error, Key) +import Data.Binary.Builder qualified as B import Data.ByteString.Char8 (pack) import Data.ByteString.Conversion import Data.ByteString.Lazy qualified as L @@ -95,6 +96,9 @@ mkKey u c = Key (toByteString' u, fromConnId c) mkKeyRabbit :: UserId -> ClientId -> Key mkKeyRabbit u c = Key (toByteString' u, toByteString' c) +instance ToByteString Key where + builder = B.fromByteString . key2bytes + key2bytes :: Key -> ByteString key2bytes (Key (u, c)) = u <> "." <> c From 7be78ad3dc8ef364689f2185561c361ca7532006 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Fri, 22 Nov 2024 17:09:18 +0100 Subject: [PATCH 20/31] Validate rabbitmq pool options --- services/cannon/src/Cannon/Options.hs | 12 +++++++++++- services/cannon/src/Cannon/Run.hs | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/services/cannon/src/Cannon/Options.hs b/services/cannon/src/Cannon/Options.hs index 5206357aa08..aa1d1e8d005 100644 --- a/services/cannon/src/Cannon/Options.hs +++ b/services/cannon/src/Cannon/Options.hs @@ -40,6 +40,7 @@ module Cannon.Options minBatchSize, disabledAPIVersions, DrainOpts, + validateOpts, ) where @@ -101,13 +102,22 @@ data Opts = Opts _optsDrainOpts :: DrainOpts, _optsDisabledAPIVersions :: !(Set VersionExp), _optsCassandraOpts :: !CassandraOpts, + -- | Maximum number of rabbitmq connections. Must be strictly positive. _optsRabbitMqMaxConnections :: Int, + -- | Maximum number of rabbitmq channels per connection. Must be strictly positive. _optsRabbitMqMaxChannels :: Int } deriving (Show, Generic) makeFields ''Opts +validateOpts :: Opts -> IO () +validateOpts opts = do + when (opts._optsRabbitMqMaxConnections <= 0) $ do + fail "rabbitMqMaxConnections must be strictly positive" + when (opts._optsRabbitMqMaxChannels <= 0) $ do + fail "rabbitMqMaxChannels must be strictly positive" + instance FromJSON Opts where parseJSON = withObject "CannonOpts" $ \o -> Opts @@ -120,5 +130,5 @@ instance FromJSON Opts where <*> o .: "drainOpts" <*> o .: "disabledAPIVersions" <*> o .: "cassandra" - <*> o .:? "rabbitMqMaxConnections" .!= 30 + <*> o .:? "rabbitMqMaxConnections" .!= 1000 <*> o .:? "rabbitMqMaxChannels" .!= 300 diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index 81a4c420d6a..f6e6367fc0a 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -69,6 +69,7 @@ type CombinedAPI = CannonAPI :<|> Internal.API run :: Opts -> IO () run o = lowerCodensity $ do + lift $ validateOpts o tracer <- Codensity withTracer when (o ^. drainOpts . millisecondsBetweenBatches == 0) $ error "drainOpts.millisecondsBetweenBatches must not be set to 0." From 1b9d71ac376f8a4a88f8a8746d8117e748537555 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Fri, 22 Nov 2024 17:12:42 +0100 Subject: [PATCH 21/31] Add CHANGELOG entry --- changelog.d/5-internal/cannon-rabbitmq-pool | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5-internal/cannon-rabbitmq-pool diff --git a/changelog.d/5-internal/cannon-rabbitmq-pool b/changelog.d/5-internal/cannon-rabbitmq-pool new file mode 100644 index 00000000000..1e7eb39e5e2 --- /dev/null +++ b/changelog.d/5-internal/cannon-rabbitmq-pool @@ -0,0 +1 @@ +Introduce RabbitMQ connection pool in cannon From dfe553ac7deb049ed5b8076e8abd84c4bece592b Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 08:57:12 +0100 Subject: [PATCH 22/31] Regenerate nix packages --- services/cannon/default.nix | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/cannon/default.nix b/services/cannon/default.nix index c074a02a1ef..c62056faa23 100644 --- a/services/cannon/default.nix +++ b/services/cannon/default.nix @@ -9,6 +9,7 @@ , async , base , bilge +, binary , bytestring , bytestring-conversion , cassandra-util @@ -44,6 +45,7 @@ , tasty-quickcheck , text , tinylog +, transformers , types-common , unix , unliftio @@ -70,10 +72,12 @@ mkDerivation { async base bilge + binary bytestring bytestring-conversion cassandra-util conduit + containers data-timeout exceptions extended @@ -96,6 +100,7 @@ mkDerivation { strict text tinylog + transformers types-common unix unliftio From 8dd651a55f93fdfd2299a193143d6637925222b5 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 09:32:01 +0100 Subject: [PATCH 23/31] =?UTF-8?q?Rename=20connectionLimit=20=E2=86=92=20nu?= =?UTF-8?q?mDictSlices?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/cannon/src/Cannon/Run.hs | 8 ++++---- services/cannon/src/Cannon/Types.hs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index f6e6367fc0a..eff9a612f81 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -28,7 +28,7 @@ import Cannon.App (maxPingInterval) import Cannon.Dict qualified as D import Cannon.Options import Cannon.RabbitMq -import Cannon.Types (Cannon, applog, clients, connectionLimit, env, mkEnv, runCannon, runCannonToServant) +import Cannon.Types hiding (Env) import Cannon.WS hiding (drainOpts, env) import Cassandra.Util (defInitCassandra) import Control.Concurrent @@ -84,9 +84,9 @@ run o = lowerCodensity $ do cassandra <- lift $ defInitCassandra (o ^. cassandraOpts) g e <- do - d1 <- D.empty connectionLimit - d2 <- D.empty connectionLimit - man <- lift $ newManager defaultManagerSettings {managerConnCount = connectionLimit} + d1 <- D.empty numDictSlices + d2 <- D.empty numDictSlices + man <- lift $ newManager defaultManagerSettings {managerConnCount = 128} rnd <- lift createSystemRandom clk <- lift mkClock mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.rabbitmq) diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index 0ac427ad1fa..81773ed32a1 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -20,7 +20,7 @@ module Cannon.Types ( Env (..), Cannon, - connectionLimit, + numDictSlices, mapConcurrentlyCannon, mkEnv, runCannon, @@ -53,8 +53,8 @@ import System.Logger qualified as Logger import System.Logger.Class hiding (info) import System.Random.MWC (GenIO) -connectionLimit :: Int -- TODO rename to max number of buckets in Dict -connectionLimit = 128 +numDictSlices :: Int +numDictSlices = 128 ----------------------------------------------------------------------------- -- Cannon monad From 11b032671c9dec62d1daaa16c0e34b6113683725 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 09:56:59 +0100 Subject: [PATCH 24/31] Fix connection creation bracket --- services/cannon/src/Cannon/RabbitMq.hs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 30f2fa5e387..7bcf6a41c99 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -273,18 +273,14 @@ acquireConnection pool = do findConnection pool >>= \case Nothing -> do bracketOnError - ( do - conn <- createConnection pool - -- if we have too many connections at this point, give up - numConnections <- -- TODO should be moved to the body - atomically $ - length <$> readTVar pool.connections - when (numConnections > pool.opts.maxConnections) $ - throw TooManyChannels - pure conn - ) - (\conn -> Q.closeConnection conn.inner) - pure + (createConnection pool) + (Q.closeConnection . (.inner)) + $ \conn -> do + -- if we have too many connections at this point, give up + numConnections <- atomically $ length <$> readTVar pool.connections + when (numConnections > pool.opts.maxConnections) $ + throw TooManyChannels + pure conn Just conn -> pure conn findConnection :: RabbitMqPool key -> IO (Maybe (PooledConnection key)) From 56e26e183afe5b269c6e11424d38198a7b227e43 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 13:08:50 +0100 Subject: [PATCH 25/31] Remove unnecessary type annotation --- services/cannon/src/Cannon/RabbitMq.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 7bcf6a41c99..08b1d328593 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -80,7 +80,7 @@ drainRabbitMqPool :: (ToByteString key) => RabbitMqPool key -> DrainOpts -> IO ( drainRabbitMqPool pool opts = do atomically $ writeTVar pool.draining True - channels :: [(key, Q.Channel)] <- atomically $ do + channels <- atomically $ do conns <- readTVar pool.connections pure $ concat [Map.assocs c.channels | c <- conns] let numberOfChannels = fromIntegral (length channels) From ba6528fc1ee2fc39f0915cdf9418bb11a644e0f6 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 13:09:41 +0100 Subject: [PATCH 26/31] Replace explicit `async` with `concurrently` --- services/cannon/src/Cannon/RabbitMq.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 08b1d328593..d5a228bd410 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -111,8 +111,9 @@ drainRabbitMqPool pool opts = do $ \_ -> do for_ (chunksOf (fromIntegral batchSize) channels) $ \batch -> do -- 16 was chosen with a roll of a fair dice. - void . async $ pooledMapConcurrentlyN_ 16 (closeChannel pool.logger) batch - liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond) + concurrently + (pooledMapConcurrentlyN_ 16 (closeChannel pool.logger) batch) + (liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond)) Log.info pool.logger $ Log.msg (Log.val "Draining complete") where closeChannel :: (ToByteString key) => Log.Logger -> (key, Q.Channel) -> IO () From 79fb2115a8ee140db80329562ed87474a2680624 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 16:42:59 +0100 Subject: [PATCH 27/31] Add rabbitmq username and password to cannon env --- charts/cannon/templates/secret.yaml | 14 ++++++++++++++ charts/cannon/templates/statefulset.yaml | 11 +++++++++++ 2 files changed, 25 insertions(+) create mode 100644 charts/cannon/templates/secret.yaml diff --git a/charts/cannon/templates/secret.yaml b/charts/cannon/templates/secret.yaml new file mode 100644 index 00000000000..1b6f9ebd94e --- /dev/null +++ b/charts/cannon/templates/secret.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Secret +metadata: + name: cannon + labels: + app: cannon + chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +type: Opaque +data: + rabbitmqUsername: {{ .Values.secrets.rabbitmq.username | b64enc | quote }} + rabbitmqPassword: {{ .Values.secrets.rabbitmq.password | b64enc | quote }} + diff --git a/charts/cannon/templates/statefulset.yaml b/charts/cannon/templates/statefulset.yaml index 2931ce01b90..348f06c1e31 100644 --- a/charts/cannon/templates/statefulset.yaml +++ b/charts/cannon/templates/statefulset.yaml @@ -92,6 +92,17 @@ spec: {{ toYaml .Values.resources | indent 12 }} {{- end }} - name: cannon + env: + - name: RABBITMQ_USERNAME + valueFrom: + secretKeyRef: + name: cannon + key: rabbitmqUsername + - name: RABBITMQ_PASSWORD + valueFrom: + secretKeyRef: + name: cannon + key: rabbitmqPassword image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" {{- if eq (include "includeSecurityContext" .) "true" }} securityContext: From 32bc29049b103dc087074251de69292784fb3300 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Tue, 26 Nov 2024 09:01:45 +0100 Subject: [PATCH 28/31] Fix rabbitmq CA secret in cannon --- charts/cannon/templates/configmap.yaml | 2 +- charts/cannon/templates/statefulset.yaml | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/charts/cannon/templates/configmap.yaml b/charts/cannon/templates/configmap.yaml index 99ffd6f2ede..bf085d9179f 100644 --- a/charts/cannon/templates/configmap.yaml +++ b/charts/cannon/templates/configmap.yaml @@ -29,7 +29,7 @@ data: enableTls: {{ .enableTls }} insecureSkipVerifyTls: {{ .insecureSkipVerifyTls }} {{- if .tlsCaSecretRef }} - caCert: /etc/wire/gundeck/rabbitmq-ca/{{ .tlsCaSecretRef.key }} + caCert: /etc/wire/cannon/rabbitmq-ca/{{ .tlsCaSecretRef.key }} {{- end }} {{- end }} diff --git a/charts/cannon/templates/statefulset.yaml b/charts/cannon/templates/statefulset.yaml index 348f06c1e31..9da4c685867 100644 --- a/charts/cannon/templates/statefulset.yaml +++ b/charts/cannon/templates/statefulset.yaml @@ -113,6 +113,11 @@ spec: mountPath: /etc/wire/cannon/externalHost - name: cannon-config mountPath: /etc/wire/cannon/conf + {{- if .Values.config.rabbitmq.tlsCaSecretRef }} + - name: "rabbitmq-ca" + secret: + secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }} + {{- end }} ports: - name: http containerPort: {{ .Values.service.internalPort }} From 564d10b2e8c715d53189304d347a57997b8c648d Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Tue, 26 Nov 2024 14:37:15 +0100 Subject: [PATCH 29/31] fixup! Fix rabbitmq CA secret in cannon --- charts/cannon/templates/statefulset.yaml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/charts/cannon/templates/statefulset.yaml b/charts/cannon/templates/statefulset.yaml index 9da4c685867..44566c78801 100644 --- a/charts/cannon/templates/statefulset.yaml +++ b/charts/cannon/templates/statefulset.yaml @@ -114,9 +114,8 @@ spec: - name: cannon-config mountPath: /etc/wire/cannon/conf {{- if .Values.config.rabbitmq.tlsCaSecretRef }} - - name: "rabbitmq-ca" - secret: - secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }} + - name: rabbitmq-ca + mountPath: "/etc/wire/cannon/rabbitmq-ca/" {{- end }} ports: - name: http @@ -171,3 +170,8 @@ spec: secret: secretName: {{ .Values.service.nginz.tls.secretName }} {{- end }} + {{- if .Values.config.rabbitmq.tlsCaSecretRef }} + - name: rabbitmq-ca + secret: + secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }} + {{- end }} From 0747ac1a801db88a65d1154733e342627105206f Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Mon, 25 Nov 2024 16:51:19 +0100 Subject: [PATCH 30/31] Remove unnecessary uses of withModifiedBackend --- integration/test/Test/Events.hs | 315 ++++++++++++++++---------------- 1 file changed, 153 insertions(+), 162 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index be6f840d9af..2400b1c9f2c 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -18,65 +18,61 @@ import SetupHelpers import Testlib.Prelude hiding (assertNoEvent) import UnliftIO hiding (handle) --- FUTUREWORK: Investigate why these tests are failing without --- `withModifiedBackend`; No events are received otherwise. testConsumeEventsOneWebSocket :: (HasCallStack) => App () testConsumeEventsOneWebSocket = do - withModifiedBackend def \domain -> do - alice <- randomUser domain def + alice <- randomUser OwnDomain def - lastNotifResp <- - retrying - (constantDelay 10_000 <> limitRetries 10) - (\_ resp -> pure $ resp.status == 404) - (\_ -> getLastNotification alice def) - lastNotifId <- lastNotifResp.json %. "id" & asString + lastNotifResp <- + retrying + (constantDelay 10_000 <> limitRetries 10) + (\_ resp -> pure $ resp.status == 404) + (\_ -> getLastNotification alice def) + lastNotifId <- lastNotifResp.json %. "id" & asString - client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - clientId <- objId client + client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + clientId <- objId client - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - deliveryTag <- assertEvent ws $ \e -> do - e %. "type" `shouldMatch` "event" - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId - e %. "data.delivery_tag" - assertNoEvent ws + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTag <- assertEvent ws $ \e -> do + e %. "type" `shouldMatch` "event" + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId + e %. "data.delivery_tag" + assertNoEvent ws - sendAck ws deliveryTag False - assertNoEvent ws + sendAck ws deliveryTag False + assertNoEvent ws - handle <- randomHandle - putHandle alice handle >>= assertSuccess + handle <- randomHandle + putHandle alice handle >>= assertSuccess - assertEvent ws $ \e -> do - e %. "type" `shouldMatch` "event" - e %. "data.event.payload.0.type" `shouldMatch` "user.update" - e %. "data.event.payload.0.user.handle" `shouldMatch` handle + assertEvent ws $ \e -> do + e %. "type" `shouldMatch` "event" + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.handle" `shouldMatch` handle - -- No new notifications should be stored in Cassandra as the user doesn't have - -- any legacy clients - getNotifications alice def {since = Just lastNotifId} `bindResponse` \resp -> do - resp.status `shouldMatchInt` 200 - shouldBeEmpty $ resp.json %. "notifications" + -- No new notifications should be stored in Cassandra as the user doesn't have + -- any legacy clients + getNotifications alice def {since = Just lastNotifId} `bindResponse` \resp -> do + resp.status `shouldMatchInt` 200 + shouldBeEmpty $ resp.json %. "notifications" testConsumeEventsForDifferentUsers :: (HasCallStack) => App () testConsumeEventsForDifferentUsers = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - bob <- randomUser domain def + alice <- randomUser OwnDomain def + bob <- randomUser OwnDomain def - aliceClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - aliceClientId <- objId aliceClient + aliceClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + aliceClientId <- objId aliceClient - bobClient <- addClient bob def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - bobClientId <- objId bobClient + bobClient <- addClient bob def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + bobClientId <- objId bobClient - lowerCodensity $ do - aliceWS <- createEventsWebSocket alice aliceClientId - bobWS <- createEventsWebSocket bob bobClientId - lift $ assertClientAdd aliceClientId aliceWS - lift $ assertClientAdd bobClientId bobWS + lowerCodensity $ do + aliceWS <- createEventsWebSocket alice aliceClientId + bobWS <- createEventsWebSocket bob bobClientId + lift $ assertClientAdd aliceClientId aliceWS + lift $ assertClientAdd bobClientId bobWS where assertClientAdd :: (HasCallStack) => String -> EventWebSocket -> App () assertClientAdd clientId ws = do @@ -89,121 +85,117 @@ testConsumeEventsForDifferentUsers = do testConsumeEventsWhileHavingLegacyClients :: (HasCallStack) => App () testConsumeEventsWhileHavingLegacyClients = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - - -- Even if alice has no clients, the notifications should still be persisted - -- in Cassandra. This choice is kinda arbitrary as these notifications - -- probably don't mean much, however, it ensures backwards compatibility. - lastNotifId <- - awaitNotification alice noValue (const $ pure True) >>= \notif -> do - notif %. "payload.0.type" `shouldMatch` "user.activate" - -- There is only one notification (at the time of writing), so we assume - -- it to be the last one. - notif %. "id" & asString - - oldClient <- addClient alice def {acapabilities = Just []} >>= getJSON 201 - - withWebSocket (alice, "anything-but-conn", oldClient %. "id") $ \oldWS -> do - newClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - newClientId <- newClient %. "id" & asString + alice <- randomUser OwnDomain def - oldNotif <- awaitMatch isUserClientAddNotif oldWS - oldNotif %. "payload.0.client.id" `shouldMatch` newClientId + -- Even if alice has no clients, the notifications should still be persisted + -- in Cassandra. This choice is kinda arbitrary as these notifications + -- probably don't mean much, however, it ensures backwards compatibility. + lastNotifId <- + awaitNotification alice noValue (const $ pure True) >>= \notif -> do + notif %. "payload.0.type" `shouldMatch` "user.activate" + -- There is only one notification (at the time of writing), so we assume + -- it to be the last one. + notif %. "id" & asString - runCodensity (createEventsWebSocket alice newClientId) $ \ws -> - assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` newClientId + oldClient <- addClient alice def {acapabilities = Just []} >>= getJSON 201 - -- All notifs are also in Cassandra because of the legacy client - getNotifications alice def {since = Just lastNotifId} `bindResponse` \resp -> do - resp.status `shouldMatchInt` 200 - resp.json %. "notifications.0.payload.0.type" `shouldMatch` "user.client-add" - resp.json %. "notifications.1.payload.0.type" `shouldMatch` "user.client-add" + withWebSocket (alice, "anything-but-conn", oldClient %. "id") $ \oldWS -> do + newClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + newClientId <- newClient %. "id" & asString -testConsumeEventsAcks :: (HasCallStack) => App () -testConsumeEventsAcks = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - clientId <- objId client + oldNotif <- awaitMatch isUserClientAddNotif oldWS + oldNotif %. "payload.0.client.id" `shouldMatch` newClientId - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + runCodensity (createEventsWebSocket alice newClientId) $ \ws -> assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId + e %. "data.event.payload.0.client.id" `shouldMatch` newClientId - -- without ack, we receive the same event again - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - deliveryTag <- assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId - e %. "data.delivery_tag" - sendAck ws deliveryTag False + -- All notifs are also in Cassandra because of the legacy client + getNotifications alice def {since = Just lastNotifId} `bindResponse` \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "notifications.0.payload.0.type" `shouldMatch` "user.client-add" + resp.json %. "notifications.1.payload.0.type" `shouldMatch` "user.client-add" - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - assertNoEvent ws +testConsumeEventsAcks :: (HasCallStack) => App () +testConsumeEventsAcks = do + alice <- randomUser OwnDomain def + client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + clientId <- objId client + + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId + + -- without ack, we receive the same event again + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTag <- assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId + e %. "data.delivery_tag" + sendAck ws deliveryTag False + + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testConsumeEventsMultipleAcks :: (HasCallStack) => App () testConsumeEventsMultipleAcks = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - clientId <- objId client + alice <- randomUser OwnDomain def + client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + clientId <- objId client - handle <- randomHandle - putHandle alice handle >>= assertSuccess + handle <- randomHandle + putHandle alice handle >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId - deliveryTag <- assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.update" - e %. "data.event.payload.0.user.handle" `shouldMatch` handle - e %. "data.delivery_tag" + deliveryTag <- assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + e %. "data.delivery_tag" - sendAck ws deliveryTag True + sendAck ws deliveryTag True - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - assertNoEvent ws + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testConsumeEventsAckNewEventWithoutAckingOldOne :: (HasCallStack) => App () testConsumeEventsAckNewEventWithoutAckingOldOne = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - clientId <- objId client + alice <- randomUser OwnDomain def + client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + clientId <- objId client - handle <- randomHandle - putHandle alice handle >>= assertSuccess + handle <- randomHandle + putHandle alice handle >>= assertSuccess - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId - deliveryTagHandleAdd <- assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.update" - e %. "data.event.payload.0.user.handle" `shouldMatch` handle - e %. "data.delivery_tag" + deliveryTagHandleAdd <- assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + e %. "data.delivery_tag" - -- Only ack the handle add delivery tag - sendAck ws deliveryTagHandleAdd False + -- Only ack the handle add delivery tag + sendAck ws deliveryTagHandleAdd False - -- Expect client-add event to be delivered again. - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - deliveryTagClientAdd <- assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` clientId - e %. "data.delivery_tag" + -- Expect client-add event to be delivered again. + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + deliveryTagClientAdd <- assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` clientId + e %. "data.delivery_tag" - sendAck ws deliveryTagClientAdd False + sendAck ws deliveryTagClientAdd False - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - assertNoEvent ws + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + assertNoEvent ws testEventsDeadLettered :: (HasCallStack) => App () testEventsDeadLettered = do @@ -269,43 +261,42 @@ testTransientEventsDoNotTriggerDeadLetters = do testTransientEvents :: (HasCallStack) => App () testTransientEvents = do - withModifiedBackend def $ \domain -> do - alice <- randomUser domain def - client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 - clientId <- objId client + alice <- randomUser OwnDomain def + client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 + clientId <- objId client - -- Self conv ID is same as user's ID, we'll use this to send typing - -- indicators, so we don't have to create another conv. - selfConvId <- objQidObject alice + -- Self conv ID is same as user's ID, we'll use this to send typing + -- indicators, so we don't have to create another conv. + selfConvId <- objQidObject alice - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - consumeAllEvents ws - sendTypingStatus alice selfConvId "started" >>= assertSuccess - assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "conversation.typing" - e %. "data.event.payload.0.qualified_conversation" `shouldMatch` selfConvId - deliveryTag <- e %. "data.delivery_tag" - sendAck ws deliveryTag False + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + consumeAllEvents ws + sendTypingStatus alice selfConvId "started" >>= assertSuccess + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "conversation.typing" + e %. "data.event.payload.0.qualified_conversation" `shouldMatch` selfConvId + deliveryTag <- e %. "data.delivery_tag" + sendAck ws deliveryTag False - handle1 <- randomHandle - putHandle alice handle1 >>= assertSuccess + handle1 <- randomHandle + putHandle alice handle1 >>= assertSuccess - sendTypingStatus alice selfConvId "stopped" >>= assertSuccess + sendTypingStatus alice selfConvId "stopped" >>= assertSuccess - handle2 <- randomHandle - putHandle alice handle2 >>= assertSuccess + handle2 <- randomHandle + putHandle alice handle2 >>= assertSuccess - -- We shouldn't see the stopped typing status because we were not connected to - -- the websocket when it was sent. The other events should still show up in - -- order. - runCodensity (createEventsWebSocket alice clientId) $ \ws -> do - for_ [handle1, handle2] $ \handle -> - assertEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.update" - e %. "data.event.payload.0.user.handle" `shouldMatch` handle - ackEvent ws e + -- We shouldn't see the stopped typing status because we were not connected to + -- the websocket when it was sent. The other events should still show up in + -- order. + runCodensity (createEventsWebSocket alice clientId) $ \ws -> do + for_ [handle1, handle2] $ \handle -> + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + ackEvent ws e - assertNoEvent ws + assertNoEvent ws testChannelLimit :: (HasCallStack) => App () testChannelLimit = withModifiedBackend From b9e2fc179dc9f67957307560467645925f0b1ba3 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Wed, 27 Nov 2024 09:47:47 +0100 Subject: [PATCH 31/31] Add rabbit-mq mount for cannon in integration --- charts/integration/templates/integration-integration.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/charts/integration/templates/integration-integration.yaml b/charts/integration/templates/integration-integration.yaml index 701c8fec634..dd351986e7b 100644 --- a/charts/integration/templates/integration-integration.yaml +++ b/charts/integration/templates/integration-integration.yaml @@ -264,6 +264,9 @@ spec: - name: rabbitmq-ca mountPath: /etc/wire/gundeck/rabbitmq-ca + - name: rabbitmq-ca + mountPath: /etc/wire/cannon/rabbitmq-ca + {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - name: "integration-cassandra" mountPath: "/certs"