diff --git a/changelog.d/0-release-notes/WPB-10660 b/changelog.d/0-release-notes/WPB-10660 new file mode 100644 index 00000000000..17305b2882f --- /dev/null +++ b/changelog.d/0-release-notes/WPB-10660 @@ -0,0 +1 @@ +charts/wire-server: There is a new config value called `background-worker.config.enableFederation` which defaults to `false`. This must be kept in sync with `tags.federation`. diff --git a/changelog.d/5-internal/background-worker b/changelog.d/5-internal/background-worker new file mode 100644 index 00000000000..d699ef088a6 --- /dev/null +++ b/changelog.d/5-internal/background-worker @@ -0,0 +1 @@ +charts/wire-server: Deploy background-worker even when tags.federation is `false` diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index fea77ab59d5..8840a43764e 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -26,7 +26,9 @@ data: host: {{ .host }} port: {{ .port }} vHost: {{ .vHost }} + {{- if $.Values.config.enableFederation }} adminPort: {{ .adminPort }} + {{- end }} enableTls: {{ .enableTls }} insecureSkipVerifyTls: {{ .insecureSkipVerifyTls }} {{- if .tlsCaSecretRef }} diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index e38cd9c8225..8b79f6af6be 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -18,6 +18,7 @@ metrics: config: logLevel: Info logFormat: StructuredJSON + enableFederation: false # keep in sync with brig, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation rabbitmq: host: rabbitmq port: 5672 diff --git a/charts/brig/values.yaml b/charts/brig/values.yaml index 561eb6c3bbd..06da5a19401 100644 --- a/charts/brig/values.yaml +++ b/charts/brig/values.yaml @@ -67,7 +67,7 @@ config: useSES: true multiSFT: enabled: false # keep multiSFT default in sync with sft chart's multiSFT.enabled - enableFederation: false # keep enableFederation default in sync with galley and cargohold chart's config.enableFederation as well as wire-server chart's tags.federation + enableFederation: false # keep in sync with background-worker, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation # Not used if enableFederation is false rabbitmq: host: rabbitmq diff --git a/charts/cargohold/values.yaml b/charts/cargohold/values.yaml index 14cfaedce64..0eb8718e0ca 100644 --- a/charts/cargohold/values.yaml +++ b/charts/cargohold/values.yaml @@ -18,7 +18,7 @@ config: logLevel: Info logFormat: StructuredJSON logNetStrings: false - enableFederation: false # keep enableFederation default in sync with brig and galley chart's config.enableFederation as well as wire-server chart's tags.federation + enableFederation: false # keep in sync with background-worker, brig and galley charts' config.enableFederation as well as wire-server chart's tags.federation aws: region: "eu-west-1" s3Bucket: assets diff --git a/charts/galley/values.yaml b/charts/galley/values.yaml index 947bb42c028..821643510ad 100644 --- a/charts/galley/values.yaml +++ b/charts/galley/values.yaml @@ -33,7 +33,7 @@ config: # tlsCaSecretRef: # name: # key: - enableFederation: false # keep enableFederation default in sync with brig and cargohold chart's config.enableFederation as well as wire-server chart's tags.federation + enableFederation: false # keep in sync with background-worker, brig and cargohold charts' config.enableFederation as well as wire-server chart's tags.federation # Not used if enableFederation is false rabbitmq: host: rabbitmq diff --git a/charts/wire-server/requirements.yaml b/charts/wire-server/requirements.yaml index 2d1fafb9674..60ed93fcbe0 100644 --- a/charts/wire-server/requirements.yaml +++ b/charts/wire-server/requirements.yaml @@ -96,7 +96,6 @@ dependencies: repository: "file://../background-worker" tags: - background-worker - - federation - haskellServices - services - name: integration diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index f0488133713..55900d494fc 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -7,7 +7,7 @@ tags: legalhold: false - federation: false # see also galley.config.enableFederation and brig.config.enableFederation + federation: false # see also {background-worker, brig, cargohold, galley}.config.enableFederation backoffice: false mlsstats: false integration: false diff --git a/docs/src/understand/configure-federation.md b/docs/src/understand/configure-federation.md index 455aafd6437..d97e3182bbc 100644 --- a/docs/src/understand/configure-federation.md +++ b/docs/src/understand/configure-federation.md @@ -371,7 +371,7 @@ certificate. Read {ref}`choose-backend-domain` again, then set the backend domain three times to the same value in the subcharts cargohold, galley and brig. You also need to set `enableFederation` to -`true`. +`true` in background-worker in addition to those charts. ``` yaml # override values for wire-server @@ -393,6 +393,10 @@ cargohold: enableFederation: true settings: federationDomain: example.com # your chosen "backend domain" + +background-worker: + config: + enableFederation: true ``` (configure-federation-strategy-in-brig)= diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 2ed14739f79..2ec70eaebfc 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -6,7 +6,7 @@ tags: cannon: true cargohold: true spar: true - federation: true # also see galley.config.enableFederation and brig.config.enableFederation + federation: true # also see {background-worker,brig,cargohold,galley}.config.enableFederation backoffice: true proxy: false legalhold: false @@ -485,6 +485,7 @@ background-worker: requests: {} imagePullPolicy: {{ .Values.imagePullPolicy }} config: + enableFederation: true backendNotificationPusher: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 500000 # 0.5s diff --git a/libs/extended/src/Network/AMQP/Extended.hs b/libs/extended/src/Network/AMQP/Extended.hs index b3131fce2af..3d1e79a218b 100644 --- a/libs/extended/src/Network/AMQP/Extended.hs +++ b/libs/extended/src/Network/AMQP/Extended.hs @@ -3,7 +3,7 @@ module Network.AMQP.Extended ( RabbitMqHooks (..), RabbitMqAdminOpts (..), - RabbitMqOpts (..), + AmqpEndpoint (..), openConnectionWithRetries, mkRabbitMqAdminClientEnv, mkRabbitMqChannelMVar, @@ -103,9 +103,9 @@ mkRabbitMqAdminClientEnv opts = do (either throwM pure <=< flip runClientM clientEnv) (toServant $ adminClient basicAuthData) --- | When admin opts are needed use `RabbitMqOpts Identity`, otherwise use --- `RabbitMqOpts NoAdmin`. -data RabbitMqOpts = RabbitMqOpts +-- | When admin opts are needed use `AmqpEndpoint Identity`, otherwise use +-- `AmqpEndpoint NoAdmin`. +data AmqpEndpoint = AmqpEndpoint { host :: !String, port :: !Int, vHost :: !Text, @@ -113,19 +113,19 @@ data RabbitMqOpts = RabbitMqOpts } deriving (Show) -instance FromJSON RabbitMqOpts where +instance FromJSON AmqpEndpoint where parseJSON = withObject "RabbitMqAdminOpts" $ \v -> - RabbitMqOpts + AmqpEndpoint <$> v .: "host" <*> v .: "port" <*> v .: "vHost" <*> parseTlsJson v -demoteOpts :: RabbitMqAdminOpts -> RabbitMqOpts -demoteOpts RabbitMqAdminOpts {..} = RabbitMqOpts {..} +demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint +demoteOpts RabbitMqAdminOpts {..} = AmqpEndpoint {..} -- | Useful if the application only pushes into some queues. -mkRabbitMqChannelMVar :: Logger -> RabbitMqOpts -> IO (MVar Q.Channel) +mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Q.Channel) mkRabbitMqChannelMVar l opts = do chanMVar <- newEmptyMVar connThread <- @@ -152,10 +152,10 @@ openConnectionWithRetries :: forall m. (MonadIO m, MonadMask m, MonadBaseControl IO m) => Logger -> - RabbitMqOpts -> + AmqpEndpoint -> RabbitMqHooks m -> m () -openConnectionWithRetries l RabbitMqOpts {..} hooks = do +openConnectionWithRetries l AmqpEndpoint {..} hooks = do (username, password) <- liftIO $ readCredsFromEnv connectWithRetries username password where diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index f7cfe209ad6..464c93e0cf0 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -17,8 +17,10 @@ import Imports import Network.AMQP qualified as Q import Network.AMQP.Extended import Network.AMQP.Lifted qualified as QL -import Network.RabbitMqAdmin +import Network.RabbitMqAdmin hiding (adminClient) +import Network.RabbitMqAdmin qualified as RabbitMqAdmin import Prometheus +import Servant.Client qualified as Servant import System.Logger.Class qualified as Log import UnliftIO import Wire.API.Federation.API @@ -197,8 +199,8 @@ pairedMaximumOn f = maximumBy (compare `on` snd) . map (id &&& f) -- FUTUREWORK: Recosider using 1 channel for many consumers. It shouldn't matter -- for a handful of remote domains. -- Consumers is passed in explicitly so that cleanup code has a reference to the consumer tags. -startPusher :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> AppT IO () -startPusher consumersRef chan = do +startPusher :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO) -> IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> AppT IO () +startPusher adminClient consumersRef chan = do -- This ensures that we receive notifications 1 by 1 which ensures they are -- delivered in order. markAsWorking BackendNotificationPusher @@ -221,7 +223,7 @@ startPusher consumersRef chan = do ] $ forever $ do - remotes <- getRemoteDomains + remotes <- getRemoteDomains adminClient ensureConsumers consumersRef chan remotes threadDelay timeBeforeNextRefresh @@ -259,8 +261,8 @@ ensureConsumer consumers chan domain = do -- let us come down this path if there is an old consumer. liftIO $ forM_ oldTag $ Q.cancelConsumer chan . fst -getRemoteDomains :: AppT IO [Domain] -getRemoteDomains = do +getRemoteDomains :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO) -> AppT IO [Domain] +getRemoteDomains adminClient = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max -- cumulative delay. When this is reached, the operation fails. -- @@ -279,9 +281,8 @@ getRemoteDomains = do where go :: AppT IO [Domain] go = do - client <- asks rabbitmqAdminClient vhost <- asks rabbitmqVHost - queues <- liftIO $ listQueuesByVHost client vhost + queues <- liftIO $ listQueuesByVHost adminClient vhost let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes logInvalidDomain d e = @@ -290,7 +291,7 @@ getRemoteDomains = do . Log.field "queue" ("backend-notifications." <> d) . Log.field "error" e -startWorker :: RabbitMqAdminOpts -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ()))) +startWorker :: AmqpEndpoint -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ()))) startWorker rabbitmqOpts = do env <- ask -- These are used in the POSIX signal handlers, so we need to make @@ -304,22 +305,28 @@ startWorker rabbitmqOpts = do clearRefs = do atomicWriteIORef chanRef Nothing atomicWriteIORef consumersRef mempty - -- We can fire and forget this thread because it keeps respawning itself using the 'onConnectionClosedHandler'. - void $ - async $ - liftIO $ - openConnectionWithRetries env.logger (demoteOpts rabbitmqOpts) $ - RabbitMqHooks - { -- The exception handling in `openConnectionWithRetries` won't open a new - -- connection on an explicit close call. - onNewChannel = \chan -> do - atomicWriteIORef chanRef $ pure chan - runAppT env $ startPusher consumersRef chan, - onChannelException = \_ -> do - clearRefs - runAppT env $ markAsNotWorking BackendNotificationPusher, - onConnectionClose = do - clearRefs - runAppT env $ markAsNotWorking BackendNotificationPusher - } + case env.rabbitmqAdminClient of + Nothing -> + Log.info $ + Log.msg $ + Log.val "RabbitMQ admin client not available, skipping backend notification pusher." + Just client -> + -- We can fire and forget this thread because it keeps respawning itself using the 'onConnectionClosedHandler'. + void $ + async $ + liftIO $ + openConnectionWithRetries env.logger rabbitmqOpts $ + RabbitMqHooks + { -- The exception handling in `openConnectionWithRetries` won't open a new + -- connection on an explicit close call. + onNewChannel = \chan -> do + atomicWriteIORef chanRef $ pure chan + runAppT env $ startPusher client consumersRef chan, + onChannelException = \_ -> do + clearRefs + runAppT env $ markAsNotWorking BackendNotificationPusher, + onConnectionClose = do + clearRefs + runAppT env $ markAsNotWorking BackendNotificationPusher + } pure (chanRef, consumersRef) diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index 3a9bc8e298a..7c5241a6bc9 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -8,6 +8,7 @@ import Data.Metrics.Servant qualified as Metrics import Data.Text qualified as T import Imports import Network.AMQP qualified as Q +import Network.AMQP.Extended (demoteOpts) import Network.Wai.Utilities.Server import Servant import Servant.Server.Generic @@ -21,7 +22,8 @@ import Wire.BackgroundWorker.Options run :: Opts -> IO () run opts = do env <- mkEnv opts - (notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker opts.rabbitmq + let amqpEP = either id demoteOpts opts.rabbitmq.unRabbitMqOpts + (notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker amqpEP let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and -- specific exception types to message threads to clean up l = logger env diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index db968315947..dcf89d56d41 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -31,7 +31,7 @@ data Worker data Env = Env { http2Manager :: Http2Manager, - rabbitmqAdminClient :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO), + rabbitmqAdminClient :: Maybe (RabbitMqAdmin.AdminAPI (Servant.AsClientT IO)), rabbitmqVHost :: Text, logger :: Logger, federatorInternal :: Endpoint, @@ -66,8 +66,8 @@ mkEnv opts = do responseTimeoutNone (\t -> responseTimeoutMicro $ 1000000 * t) -- seconds to microseconds opts.defederationTimeout - rabbitmqVHost = opts.rabbitmq.vHost - rabbitmqAdminClient <- mkRabbitMqAdminClientEnv opts.rabbitmq + rabbitmqVHost = either (.vHost) (.vHost) opts.rabbitmq.unRabbitMqOpts + rabbitmqAdminClient <- for (rightToMaybe opts.rabbitmq.unRabbitMqOpts) mkRabbitMqAdminClientEnv statuses <- newIORef $ Map.fromList diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index da31c41255a..cdbeb1e5024 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -11,7 +11,7 @@ data Opts = Opts logFormat :: !(Maybe (Last LogFormat)), backgroundWorker :: !Endpoint, federatorInternal :: !Endpoint, - rabbitmq :: !RabbitMqAdminOpts, + rabbitmq :: !RabbitMqOpts, -- | Seconds, Nothing for no timeout defederationTimeout :: Maybe Int, backendNotificationPusher :: BackendNotificationsConfig @@ -37,3 +37,13 @@ data BackendNotificationsConfig = BackendNotificationsConfig deriving (Show, Generic) instance FromJSON BackendNotificationsConfig + +newtype RabbitMqOpts = RabbitMqOpts {unRabbitMqOpts :: Either AmqpEndpoint RabbitMqAdminOpts} + deriving (Show) + +instance FromJSON RabbitMqOpts where + parseJSON v = + RabbitMqOpts + <$> ( (Right <$> parseJSON v) + <|> (Left <$> parseJSON v) + ) diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 351f38f7c4a..29906684cae 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -270,13 +270,13 @@ spec = do let federatorInternal = Endpoint "localhost" 8097 http2Manager = undefined statuses = undefined - rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin + rabbitmqAdminClient = Just $ mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics - domains <- runAppT Env {..} getRemoteDomains + domains <- runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"] @@ -287,12 +287,12 @@ spec = do let federatorInternal = Endpoint "localhost" 8097 http2Manager = undefined statuses = undefined - rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin + rabbitmqAdminClient = Just $ mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics - domainsThread <- async $ runAppT Env {..} getRemoteDomains + domainsThread <- async $ runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) -- Wait for first call untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls) diff --git a/services/brig/src/Brig/Options.hs b/services/brig/src/Brig/Options.hs index a5a1f761fb6..10cfef98b9e 100644 --- a/services/brig/src/Brig/Options.hs +++ b/services/brig/src/Brig/Options.hs @@ -390,7 +390,7 @@ data Opts = Opts -- | SFT Federation multiSFT :: !(Maybe Bool), -- | RabbitMQ settings, required when federation is enabled. - rabbitmq :: !(Maybe RabbitMqOpts), + rabbitmq :: !(Maybe AmqpEndpoint), -- | AWS settings aws :: !AWSOpts, -- | Enable Random Prekey Strategy diff --git a/services/galley/src/Galley/Options.hs b/services/galley/src/Galley/Options.hs index 3e1b97aa1cf..1d602e420f3 100644 --- a/services/galley/src/Galley/Options.hs +++ b/services/galley/src/Galley/Options.hs @@ -182,7 +182,7 @@ data Opts = Opts -- | Federator endpoint _federator :: !(Maybe Endpoint), -- | RabbitMQ settings, required when federation is enabled. - _rabbitmq :: !(Maybe RabbitMqOpts), + _rabbitmq :: !(Maybe AmqpEndpoint), -- | Disco URL _discoUrl :: !(Maybe Text), -- | Other settings