Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/0-release-notes/WPB-10660
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
charts/wire-server: There is a new config value called `background-worker.config.enableFederation` which defaults to `false`. This must be kept in sync with `tags.federation`.
1 change: 1 addition & 0 deletions changelog.d/5-internal/background-worker
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
charts/wire-server: Deploy background-worker even when tags.federation is `false`
2 changes: 2 additions & 0 deletions charts/background-worker/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ data:
host: {{ .host }}
port: {{ .port }}
vHost: {{ .vHost }}
{{- if $.Values.config.enableFederation }}
adminPort: {{ .adminPort }}
{{- end }}
enableTls: {{ .enableTls }}
insecureSkipVerifyTls: {{ .insecureSkipVerifyTls }}
{{- if .tlsCaSecretRef }}
Expand Down
1 change: 1 addition & 0 deletions charts/background-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ metrics:
config:
logLevel: Info
logFormat: StructuredJSON
enableFederation: false # keep in sync with brig, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation
rabbitmq:
host: rabbitmq
port: 5672
Expand Down
2 changes: 1 addition & 1 deletion charts/brig/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ config:
useSES: true
multiSFT:
enabled: false # keep multiSFT default in sync with sft chart's multiSFT.enabled
enableFederation: false # keep enableFederation default in sync with galley and cargohold chart's config.enableFederation as well as wire-server chart's tags.federation
enableFederation: false # keep in sync with background-worker, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation
# Not used if enableFederation is false
rabbitmq:
host: rabbitmq
Expand Down
2 changes: 1 addition & 1 deletion charts/cargohold/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ config:
logLevel: Info
logFormat: StructuredJSON
logNetStrings: false
enableFederation: false # keep enableFederation default in sync with brig and galley chart's config.enableFederation as well as wire-server chart's tags.federation
enableFederation: false # keep in sync with background-worker, brig and galley charts' config.enableFederation as well as wire-server chart's tags.federation
aws:
region: "eu-west-1"
s3Bucket: assets
Expand Down
2 changes: 1 addition & 1 deletion charts/galley/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ config:
# tlsCaSecretRef:
# name: <secret-name>
# key: <ca-attribute>
enableFederation: false # keep enableFederation default in sync with brig and cargohold chart's config.enableFederation as well as wire-server chart's tags.federation
enableFederation: false # keep in sync with background-worker, brig and cargohold charts' config.enableFederation as well as wire-server chart's tags.federation
# Not used if enableFederation is false
rabbitmq:
host: rabbitmq
Expand Down
1 change: 0 additions & 1 deletion charts/wire-server/requirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ dependencies:
repository: "file://../background-worker"
tags:
- background-worker
- federation
- haskellServices
- services
- name: integration
Expand Down
2 changes: 1 addition & 1 deletion charts/wire-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

tags:
legalhold: false
federation: false # see also galley.config.enableFederation and brig.config.enableFederation
federation: false # see also {background-worker, brig, cargohold, galley}.config.enableFederation
backoffice: false
mlsstats: false
integration: false
6 changes: 5 additions & 1 deletion docs/src/understand/configure-federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ certificate.
Read {ref}`choose-backend-domain` again, then
set the backend domain three times to the same value in the subcharts
cargohold, galley and brig. You also need to set `enableFederation` to
`true`.
`true` in background-worker in addition to those charts.

``` yaml
# override values for wire-server
Expand All @@ -393,6 +393,10 @@ cargohold:
enableFederation: true
settings:
federationDomain: example.com # your chosen "backend domain"

background-worker:
config:
enableFederation: true
```

(configure-federation-strategy-in-brig)=
Expand Down
3 changes: 2 additions & 1 deletion hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ tags:
cannon: true
cargohold: true
spar: true
federation: true # also see galley.config.enableFederation and brig.config.enableFederation
federation: true # also see {background-worker,brig,cargohold,galley}.config.enableFederation
backoffice: true
proxy: false
legalhold: false
Expand Down Expand Up @@ -485,6 +485,7 @@ background-worker:
requests: {}
imagePullPolicy: {{ .Values.imagePullPolicy }}
config:
enableFederation: true
backendNotificationPusher:
pushBackoffMinWait: 1000 # 1ms
pushBackoffMaxWait: 500000 # 0.5s
Expand Down
22 changes: 11 additions & 11 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Network.AMQP.Extended
( RabbitMqHooks (..),
RabbitMqAdminOpts (..),
RabbitMqOpts (..),
AmqpEndpoint (..),
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqChannelMVar,
Expand Down Expand Up @@ -103,29 +103,29 @@ mkRabbitMqAdminClientEnv opts = do
(either throwM pure <=< flip runClientM clientEnv)
(toServant $ adminClient basicAuthData)

-- | When admin opts are needed use `RabbitMqOpts Identity`, otherwise use
-- `RabbitMqOpts NoAdmin`.
data RabbitMqOpts = RabbitMqOpts
-- | When admin opts are needed use `AmqpEndpoint Identity`, otherwise use
-- `AmqpEndpoint NoAdmin`.
data AmqpEndpoint = AmqpEndpoint
{ host :: !String,
port :: !Int,
vHost :: !Text,
tls :: !(Maybe RabbitMqTlsOpts)
}
deriving (Show)

instance FromJSON RabbitMqOpts where
instance FromJSON AmqpEndpoint where
parseJSON = withObject "RabbitMqAdminOpts" $ \v ->
RabbitMqOpts
AmqpEndpoint
<$> v .: "host"
<*> v .: "port"
<*> v .: "vHost"
<*> parseTlsJson v

demoteOpts :: RabbitMqAdminOpts -> RabbitMqOpts
demoteOpts RabbitMqAdminOpts {..} = RabbitMqOpts {..}
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts RabbitMqAdminOpts {..} = AmqpEndpoint {..}

-- | Useful if the application only pushes into some queues.
mkRabbitMqChannelMVar :: Logger -> RabbitMqOpts -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar l opts = do
chanMVar <- newEmptyMVar
connThread <-
Expand All @@ -152,10 +152,10 @@ openConnectionWithRetries ::
forall m.
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger ->
RabbitMqOpts ->
AmqpEndpoint ->
RabbitMqHooks m ->
m ()
openConnectionWithRetries l RabbitMqOpts {..} hooks = do
openConnectionWithRetries l AmqpEndpoint {..} hooks = do
(username, password) <- liftIO $ readCredsFromEnv
connectWithRetries username password
where
Expand Down
61 changes: 34 additions & 27 deletions services/background-worker/src/Wire/BackendNotificationPusher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Extended
import Network.AMQP.Lifted qualified as QL
import Network.RabbitMqAdmin
import Network.RabbitMqAdmin hiding (adminClient)
import Network.RabbitMqAdmin qualified as RabbitMqAdmin
import Prometheus
import Servant.Client qualified as Servant
import System.Logger.Class qualified as Log
import UnliftIO
import Wire.API.Federation.API
Expand Down Expand Up @@ -197,8 +199,8 @@ pairedMaximumOn f = maximumBy (compare `on` snd) . map (id &&& f)
-- FUTUREWORK: Recosider using 1 channel for many consumers. It shouldn't matter
-- for a handful of remote domains.
-- Consumers is passed in explicitly so that cleanup code has a reference to the consumer tags.
startPusher :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> AppT IO ()
startPusher consumersRef chan = do
startPusher :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO) -> IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> AppT IO ()
startPusher adminClient consumersRef chan = do
-- This ensures that we receive notifications 1 by 1 which ensures they are
-- delivered in order.
markAsWorking BackendNotificationPusher
Expand All @@ -221,7 +223,7 @@ startPusher consumersRef chan = do
]
$ forever
$ do
remotes <- getRemoteDomains
remotes <- getRemoteDomains adminClient
ensureConsumers consumersRef chan remotes
threadDelay timeBeforeNextRefresh

Expand Down Expand Up @@ -259,8 +261,8 @@ ensureConsumer consumers chan domain = do
-- let us come down this path if there is an old consumer.
liftIO $ forM_ oldTag $ Q.cancelConsumer chan . fst

getRemoteDomains :: AppT IO [Domain]
getRemoteDomains = do
getRemoteDomains :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO) -> AppT IO [Domain]
getRemoteDomains adminClient = do
-- Jittered exponential backoff with 10ms as starting delay and 60s as max
-- cumulative delay. When this is reached, the operation fails.
--
Expand All @@ -279,9 +281,8 @@ getRemoteDomains = do
where
go :: AppT IO [Domain]
go = do
client <- asks rabbitmqAdminClient
vhost <- asks rabbitmqVHost
queues <- liftIO $ listQueuesByVHost client vhost
queues <- liftIO $ listQueuesByVHost adminClient vhost
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues
catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
logInvalidDomain d e =
Expand All @@ -290,7 +291,7 @@ getRemoteDomains = do
. Log.field "queue" ("backend-notifications." <> d)
. Log.field "error" e

startWorker :: RabbitMqAdminOpts -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ())))
startWorker :: AmqpEndpoint -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ())))
startWorker rabbitmqOpts = do
env <- ask
-- These are used in the POSIX signal handlers, so we need to make
Expand All @@ -304,22 +305,28 @@ startWorker rabbitmqOpts = do
clearRefs = do
atomicWriteIORef chanRef Nothing
atomicWriteIORef consumersRef mempty
-- We can fire and forget this thread because it keeps respawning itself using the 'onConnectionClosedHandler'.
void $
async $
liftIO $
openConnectionWithRetries env.logger (demoteOpts rabbitmqOpts) $
RabbitMqHooks
{ -- The exception handling in `openConnectionWithRetries` won't open a new
-- connection on an explicit close call.
onNewChannel = \chan -> do
atomicWriteIORef chanRef $ pure chan
runAppT env $ startPusher consumersRef chan,
onChannelException = \_ -> do
clearRefs
runAppT env $ markAsNotWorking BackendNotificationPusher,
onConnectionClose = do
clearRefs
runAppT env $ markAsNotWorking BackendNotificationPusher
}
case env.rabbitmqAdminClient of
Nothing ->
Log.info $
Log.msg $
Log.val "RabbitMQ admin client not available, skipping backend notification pusher."
Just client ->
-- We can fire and forget this thread because it keeps respawning itself using the 'onConnectionClosedHandler'.
void $
async $
liftIO $
openConnectionWithRetries env.logger rabbitmqOpts $
RabbitMqHooks
{ -- The exception handling in `openConnectionWithRetries` won't open a new
-- connection on an explicit close call.
onNewChannel = \chan -> do
atomicWriteIORef chanRef $ pure chan
runAppT env $ startPusher client consumersRef chan,
onChannelException = \_ -> do
clearRefs
runAppT env $ markAsNotWorking BackendNotificationPusher,
onConnectionClose = do
clearRefs
runAppT env $ markAsNotWorking BackendNotificationPusher
}
pure (chanRef, consumersRef)
4 changes: 3 additions & 1 deletion services/background-worker/src/Wire/BackgroundWorker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Data.Metrics.Servant qualified as Metrics
import Data.Text qualified as T
import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Extended (demoteOpts)
import Network.Wai.Utilities.Server
import Servant
import Servant.Server.Generic
Expand All @@ -21,7 +22,8 @@ import Wire.BackgroundWorker.Options
run :: Opts -> IO ()
run opts = do
env <- mkEnv opts
(notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker opts.rabbitmq
let amqpEP = either id demoteOpts opts.rabbitmq.unRabbitMqOpts
(notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker amqpEP
let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and
-- specific exception types to message threads to clean up
l = logger env
Expand Down
6 changes: 3 additions & 3 deletions services/background-worker/src/Wire/BackgroundWorker/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ data Worker

data Env = Env
{ http2Manager :: Http2Manager,
rabbitmqAdminClient :: RabbitMqAdmin.AdminAPI (Servant.AsClientT IO),
rabbitmqAdminClient :: Maybe (RabbitMqAdmin.AdminAPI (Servant.AsClientT IO)),
rabbitmqVHost :: Text,
logger :: Logger,
federatorInternal :: Endpoint,
Expand Down Expand Up @@ -66,8 +66,8 @@ mkEnv opts = do
responseTimeoutNone
(\t -> responseTimeoutMicro $ 1000000 * t) -- seconds to microseconds
opts.defederationTimeout
rabbitmqVHost = opts.rabbitmq.vHost
rabbitmqAdminClient <- mkRabbitMqAdminClientEnv opts.rabbitmq
rabbitmqVHost = either (.vHost) (.vHost) opts.rabbitmq.unRabbitMqOpts
rabbitmqAdminClient <- for (rightToMaybe opts.rabbitmq.unRabbitMqOpts) mkRabbitMqAdminClientEnv
statuses <-
newIORef $
Map.fromList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data Opts = Opts
logFormat :: !(Maybe (Last LogFormat)),
backgroundWorker :: !Endpoint,
federatorInternal :: !Endpoint,
rabbitmq :: !RabbitMqAdminOpts,
rabbitmq :: !RabbitMqOpts,
-- | Seconds, Nothing for no timeout
defederationTimeout :: Maybe Int,
backendNotificationPusher :: BackendNotificationsConfig
Expand All @@ -37,3 +37,13 @@ data BackendNotificationsConfig = BackendNotificationsConfig
deriving (Show, Generic)

instance FromJSON BackendNotificationsConfig

newtype RabbitMqOpts = RabbitMqOpts {unRabbitMqOpts :: Either AmqpEndpoint RabbitMqAdminOpts}
deriving (Show)

instance FromJSON RabbitMqOpts where
parseJSON v =
RabbitMqOpts
<$> ( (Right <$> parseJSON v)
<|> (Left <$> parseJSON v)
)
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ spec = do
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
statuses = undefined
rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin
rabbitmqAdminClient = Just $ mockRabbitMqAdminClient mockAdmin
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000

backendNotificationMetrics <- mkBackendNotificationMetrics
domains <- runAppT Env {..} getRemoteDomains
domains <- runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient)
domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"]
readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"]

Expand All @@ -287,12 +287,12 @@ spec = do
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
statuses = undefined
rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin
rabbitmqAdminClient = Just $ mockRabbitMqAdminClient mockAdmin
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000
backendNotificationMetrics <- mkBackendNotificationMetrics
domainsThread <- async $ runAppT Env {..} getRemoteDomains
domainsThread <- async $ runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient)

-- Wait for first call
untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls)
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ data Opts = Opts
-- | SFT Federation
multiSFT :: !(Maybe Bool),
-- | RabbitMQ settings, required when federation is enabled.
rabbitmq :: !(Maybe RabbitMqOpts),
rabbitmq :: !(Maybe AmqpEndpoint),
-- | AWS settings
aws :: !AWSOpts,
-- | Enable Random Prekey Strategy
Expand Down
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ data Opts = Opts
-- | Federator endpoint
_federator :: !(Maybe Endpoint),
-- | RabbitMQ settings, required when federation is enabled.
_rabbitmq :: !(Maybe RabbitMqOpts),
_rabbitmq :: !(Maybe AmqpEndpoint),
-- | Disco URL
_discoUrl :: !(Maybe Text),
-- | Other settings
Expand Down