Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
9823fd0
nit-pick.
fisx Aug 22, 2023
a347a75
nit-pick.
fisx Aug 22, 2023
88f202f
Call `getRemoteDomains` in a loop (again).
fisx Aug 22, 2023
8752761
Remove dead code.
fisx Aug 22, 2023
3d4ffc1
Don't call brig from background-worker.
fisx Aug 22, 2023
3998753
Changelog.
fisx Aug 22, 2023
06e2f3f
make sanitize-pr
fisx Aug 22, 2023
de782bf
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Aug 24, 2023
90477ca
Make config value required, make it milliseconds and Int.
fisx Aug 24, 2023
21c5770
rename.
fisx Aug 24, 2023
ff59df1
Fixups
fisx Aug 24, 2023
3e2a4c3
Fix bug about ensuring rabbitMQ queues and namespaces.
fisx Aug 24, 2023
07042e7
Fix integration tests.
fisx Aug 24, 2023
7265f2d
FUTUREWORK.
fisx Aug 24, 2023
cc820b8
TODO
fisx Aug 24, 2023
3252b16
Move BackgroundNotifications module to a more suitlable place.
fisx Aug 25, 2023
a6230d3
Move BackgroundNotifications module (cleanup).
fisx Aug 25, 2023
de0cabd
Clean up ensureQueue mess.
fisx Aug 25, 2023
1277282
Cleanup `remotesRefreshInterval` unit inconsistency.
fisx Aug 25, 2023
883c063
make sanitize-pr
fisx Aug 25, 2023
f76a1c6
Remove seemless-migration-to-database-remotes hack.
fisx Aug 25, 2023
815d15d
Update changelog.
fisx Aug 25, 2023
ca6c633
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Aug 25, 2023
4ce60fc
Fixup
fisx Aug 25, 2023
e38a6c7
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Aug 28, 2023
cb6184c
...
fisx Aug 28, 2023
f3440fd
...
fisx Aug 28, 2023
15c0cef
...
fisx Aug 28, 2023
2200da8
...
fisx Sep 4, 2023
97856cd
Fixup
fisx Sep 4, 2023
d456a7c
Fixup
fisx Sep 4, 2023
b9afef6
Fixup
fisx Sep 4, 2023
eb7ee50
Fixup
fisx Sep 4, 2023
dc70e1c
Fixup
fisx Sep 4, 2023
1e7d00c
Fix two tests.
fisx Sep 4, 2023
c5bb489
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 4, 2023
fddd9c9
stash
fisx Sep 5, 2023
91718c1
Added origin domain to e2e config, added search policy to failing test.
elland Sep 5, 2023
195c51c
Restored test assertion.
elland Sep 5, 2023
878b407
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 5, 2023
24ce4f9
Bumped timeout for offline BE notification
elland Sep 5, 2023
b244bf0
Better names, better types.
fisx Sep 5, 2023
8c8da68
Cleanup.
fisx Sep 5, 2023
12d11cb
Cleanup.
fisx Sep 5, 2023
606476c
Merge remote-tracking branch 'refs/remotes/origin/WPB-3796/fix-backgr…
fisx Sep 5, 2023
b854f6d
Cleanup.
fisx Sep 5, 2023
d716699
Cleanup.
fisx Sep 5, 2023
055f55d
hi ci
elland Sep 6, 2023
abd25da
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 6, 2023
3ac378b
Fix: AllowAll has default search policy FullSearch.
fisx Sep 6, 2023
29e3429
Revert "Fix: AllowAll has default search policy FullSearch."
fisx Sep 6, 2023
b246036
Fix federator integration tests. All green now?
fisx Sep 7, 2023
d47e4df
hi ci
elland Sep 7, 2023
f32375d
Fixup
fisx Sep 7, 2023
00fd31a
Merge remote-tracking branch 'refs/remotes/origin/WPB-3796/fix-backgr…
fisx Sep 7, 2023
67b7b2e
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 7, 2023
2a86f02
Fixup
fisx Sep 7, 2023
b0f3e4d
hi ci
fisx Sep 7, 2023
4f246bf
Re-align brig-integration config file on ci.
fisx Sep 8, 2023
4d6e6da
Increase time to wait for events in /integration.
fisx Sep 8, 2023
06e21e2
Fixup?
fisx Sep 8, 2023
f2cf937
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 8, 2023
a6927ba
Fixup!
fisx Sep 9, 2023
7c8bf45
*sigh*
fisx Sep 10, 2023
975f142
?!
fisx Sep 11, 2023
5f89f96
...
fisx Sep 11, 2023
3bbcf4d
Merge remote-tracking branch 'origin/develop' into WPB-3796/fix-backg…
fisx Sep 11, 2023
23b7130
Makefile: clean up federation_remotes before `make ci-fast`.
fisx Sep 11, 2023
4c34668
Fix one more.
fisx Sep 11, 2023
5f74ad8
Nit-pick.
fisx Sep 11, 2023
e65ef0b
...
fisx Sep 12, 2023
43dd731
stash
fisx Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/WPB-3796
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[background-worker] Get federation remote domains from rabbitMQ, not from brig (this fixes a bug in federation policy `allowAll`)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no release made with this bug, so you can also just drop this changelog.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

2 changes: 0 additions & 2 deletions services/background-worker/background-worker.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ library
aeson
, amqp
, async
, base
, bilge
, bytestring-conversion
, containers
Expand All @@ -56,7 +55,6 @@ library
, types-common
, unliftio
, wai-utilities
, wire-api
, wire-api-federation

default-extensions:
Expand Down
2 changes: 0 additions & 2 deletions services/background-worker/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ mkDerivation {
aeson
amqp
async
base
bilge
bytestring-conversion
containers
Expand All @@ -81,7 +80,6 @@ mkDerivation {
types-common
unliftio
wai-utilities
wire-api
wire-api-federation
];
executableHaskellDepends = [ HsOpenSSL imports types-common ];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import System.Logger.Class qualified as Log
import UnliftIO
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Routes.FederationDomainConfig
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Options
import Wire.BackgroundWorker.Util
Expand Down Expand Up @@ -123,7 +122,7 @@ startPusher consumersRef chan = do
throwM e

-- If this thread is cancelled, catch the exception, kill the consumers, and carry on.
-- FUTUREWORK?:
--
-- If this throws an exception on the Chan / in the forever loop, the exception will
-- bubble all the way up and kill the pod. Kubernetes should restart the pod automatically.
flip
Expand All @@ -132,29 +131,15 @@ startPusher consumersRef chan = do
Handler $ cleanup @SomeAsyncException
]
$ do
-- Get an initial set of domains from the sync thread
-- The Chan that we will be waiting on isn't initialised with a
-- value until the domain update loop runs the callback for the
-- first time.
initRemotes <- liftIO $ readIORef env.remoteDomains
-- Get an initial set of consumers for the domains pulled from the IORef
-- so that we aren't just sitting around not doing anything for a bit at
-- the start.
ensureConsumers consumersRef chan $ domain <$> initRemotes.remotes
-- Wait for updates to the domains, this is where the bulk of the action
-- is going to take place
consumers <- newIORef mempty
forever $ do
-- Wait for a new set of domains. This is a blocking action
-- so we will only move past here when we get a new set of domains.
-- It is a bit nicer than having another timeout value, as Brig is
-- already providing one in the domain update message.
chanRemotes <- liftIO $ readChan env.remoteDomainsChan
-- Make new consumers for the new domains, clean up old ones from the consumer map.
ensureConsumers consumersRef chan $ domain <$> chanRemotes.remotes
remoteDomains <- getRemoteDomains
ensureConsumers consumers chan remoteDomains
threadDelay (round $ 1_000_000 * fromMaybe 60 env.backendNotificationsConfig.remotesRefreshInterval)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a Maybe, we can put default values in all the places, i.e. helm chart and integration test configs? IMO this is unnecessary complexity.


ensureConsumers :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> [Domain] -> AppT IO ()
ensureConsumers consumers chan domains = do
keys' <- Set.fromList . Map.keys <$> readIORef consumers
keys' <- Map.keysSet <$> readIORef consumers
let domains' = Set.fromList domains
droppedDomains = Set.difference keys' domains'
-- Loop over all of the new domains. We can check for existing consumers and add new ones.
Expand Down
4 changes: 1 addition & 3 deletions services/background-worker/src/Wire/BackgroundWorker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

module Wire.BackgroundWorker where

import Control.Concurrent.Async (cancel)
import Data.Domain
import Data.Map.Strict qualified as Map
import Data.Metrics.Servant qualified as Metrics
Expand All @@ -22,14 +21,13 @@ import Wire.Defederation as Defederation
-- FUTUREWORK: Start an http service with status and metrics endpoints
run :: Opts -> IO ()
run opts = do
(env, syncThread) <- mkEnv opts
env <- mkEnv opts
(defedChanRef, defedConsumerRef) <- runAppT env $ Defederation.startWorker opts.rabbitmq
(notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker opts.rabbitmq
let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and
-- specific exception types to message threads to clean up
l = logger env
cleanup = do
cancel syncThread
-- Cancel the consumers and wait for them to finish their processing step.
-- Defederation thread
Log.info (logger env) $ Log.msg (Log.val "Cancelling the defederation thread")
Expand Down
19 changes: 2 additions & 17 deletions services/background-worker/src/Wire/BackgroundWorker/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

module Wire.BackgroundWorker.Env where

import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Trans.Control
Expand All @@ -24,8 +22,6 @@ import System.Logger qualified as Log
import System.Logger.Class (Logger, MonadLogger (..))
import System.Logger.Extended qualified as Log
import Util.Options
import Wire.API.FederationUpdate
import Wire.API.Routes.FederationDomainConfig
import Wire.BackgroundWorker.Options

type IsWorking = Bool
Expand All @@ -45,10 +41,7 @@ data Env = Env
federatorInternal :: Endpoint,
httpManager :: Manager,
galley :: Endpoint,
brig :: Endpoint,
defederationTimeout :: ResponseTimeout,
remoteDomains :: IORef FederationDomainConfigs,
remoteDomainsChan :: Chan FederationDomainConfigs,
backendNotificationMetrics :: BackendNotificationMetrics,
-- This is needed so that the defederation worker can push
-- connection-removed notifications into the notifications channels.
Expand All @@ -71,27 +64,19 @@ mkBackendNotificationMetrics =
<*> register (vector "targetDomain" $ counter $ Prometheus.Info "wire_backend_notifications_errors" "Number of errors that occurred while pushing notifications")
<*> register (vector "targetDomain" $ gauge $ Prometheus.Info "wire_backend_notifications_stuck_queues" "Set to 1 when pushing notifications is stuck")

mkEnv :: Opts -> IO (Env, Async ())
mkEnv :: Opts -> IO Env
mkEnv opts = do
http2Manager <- initHttp2Manager
logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat
httpManager <- newManager defaultManagerSettings
remoteDomainsChan <- newChan
let federatorInternal = opts.federatorInternal
galley = opts.galley
defederationTimeout =
maybe
responseTimeoutNone
(\t -> responseTimeoutMicro $ 1000000 * t) -- seconds to microseconds
opts.defederationTimeout
brig = opts.brig
rabbitmqVHost = opts.rabbitmq.vHost
callback =
SyncFedDomainConfigsCallback
{ fromFedUpdateCallback = \_old new -> do
writeChan remoteDomainsChan new
}
(remoteDomains, syncThread) <- syncFedDomainConfigs brig logger callback
rabbitmqAdminClient <- mkRabbitMqAdminClientEnv opts.rabbitmq
statuses <-
newIORef $
Expand All @@ -103,7 +88,7 @@ mkEnv opts = do
backendNotificationMetrics <- mkBackendNotificationMetrics
notificationChannel <- mkRabbitMqChannelMVar logger $ demoteOpts opts.rabbitmq
let backendNotificationsConfig = opts.backendNotificationPusher
pure (Env {..}, syncThread)
pure Env {..}

initHttp2Manager :: IO Http2Manager
initHttp2Manager = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ data Opts = Opts
federatorInternal :: !Endpoint,
rabbitmq :: !RabbitMqAdminOpts,
galley :: !Endpoint,
brig :: !Endpoint,
-- | Seconds, Nothing for no timeout
defederationTimeout :: Maybe Int,
backendNotificationPusher :: BackendNotificationsConfig
Expand All @@ -31,7 +30,9 @@ data BackendNotificationsConfig = BackendNotificationsConfig
-- | Upper limit on amount of time (in microseconds) to wait before retrying
-- any notification. This exists to ensure that exponential back-off doesn't
-- cause wait times to be very big.
pushBackoffMaxWait :: Int
pushBackoffMaxWait :: Int,
-- | Number of seconds between two calls to `getRemoteDomains`.
remotesRefreshInterval :: Maybe Double

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using Double can we just take number of milliseconds? We won't need rounding like this. I think we can safely assume people running wire are capable of understanding different units of time than seconds.
This also makes it more in line with the config one line above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get the last point, but i just think doubles are more ergonomic. anyway i'll align it with the rest.

}
deriving (Show, Generic)

Expand Down
17 changes: 0 additions & 17 deletions services/background-worker/src/Wire/Defederation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@ import Control.Monad.Catch
import Control.Retry
import Data.Aeson qualified as A
import Data.ByteString.Conversion
import Data.Domain
import Data.Text (unpack)
import Data.Text.Encoding
import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Extended
import Network.AMQP.Lifted qualified as QL
import Network.HTTP.Client
import Network.HTTP.Types
import Servant.Client (BaseUrl (..), ClientEnv, Scheme (Http), mkClientEnv)
import System.Logger.Class qualified as Log
import Util.Options
import Util.Options qualified as O
import Wire.API.Federation.BackendNotifications
import Wire.API.Routes.FederationDomainConfig qualified as Fed
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Util

Expand Down Expand Up @@ -52,18 +47,6 @@ deleteFederationDomainInner' go (msg, envelope) = do
Log.msg (Log.val "Failed to delete federation domain")
. Log.field "error" err

mkBrigEnv :: AppT IO ClientEnv
mkBrigEnv = do
Endpoint brigHost brigPort <- asks brig
mkClientEnv
<$> asks httpManager
<*> pure (BaseUrl Http (unpack brigHost) (fromIntegral brigPort) "")

getRemoteDomains :: AppT IO [Domain]
getRemoteDomains = do
ref <- asks remoteDomains
fmap Fed.domain . Fed.remotes <$> readIORef ref

callGalleyDelete ::
( MonadReader Env m,
MonadMask m,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

module Test.Wire.BackendNotificationPusherSpec where

import Control.Concurrent.Chan
import Control.Exception
import Control.Monad.Trans.Except
import Data.Aeson qualified as Aeson
Expand Down Expand Up @@ -42,7 +41,6 @@ import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Common
import Wire.API.Federation.BackendNotifications
import Wire.API.RawJson
import Wire.API.Routes.FederationDomainConfig
import Wire.BackendNotificationPusher
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Options
Expand Down Expand Up @@ -220,8 +218,6 @@ spec = do
]
logger <- Logger.new Logger.defSettings
httpManager <- newManager defaultManagerSettings
remoteDomains <- newIORef defFederationDomainConfigs
remoteDomainsChan <- newChan
notificationChannel <- newEmptyMVar
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
Expand All @@ -231,8 +227,7 @@ spec = do
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
galley = Endpoint "localhost" 8085
brig = Endpoint "localhost" 8082
backendNotificationsConfig = BackendNotificationsConfig 1000 500000
backendNotificationsConfig = BackendNotificationsConfig 1000 500000 (Just 0.5)

backendNotificationMetrics <- mkBackendNotificationMetrics
domains <- runAppT Env {..} getRemoteDomains
Expand All @@ -243,8 +238,6 @@ spec = do
mockAdmin <- newMockRabbitMqAdmin True ["backend-notifications.foo.example"]
logger <- Logger.new Logger.defSettings
httpManager <- newManager defaultManagerSettings
remoteDomains <- newIORef defFederationDomainConfigs
remoteDomainsChan <- newChan
notificationChannel <- newEmptyMVar
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
Expand All @@ -254,8 +247,7 @@ spec = do
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
galley = Endpoint "localhost" 8085
brig = Endpoint "localhost" 8082
backendNotificationsConfig = BackendNotificationsConfig 1000 500000
backendNotificationsConfig = BackendNotificationsConfig 1000 500000 (Just 0.5)
backendNotificationMetrics <- mkBackendNotificationMetrics
domainsThread <- async $ runAppT Env {..} getRemoteDomains

Expand Down
7 changes: 1 addition & 6 deletions services/background-worker/test/Test/Wire/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

module Test.Wire.Util where

import Control.Concurrent.Chan
import Imports
import Network.HTTP.Client
import System.Logger.Class qualified as Logger
import Util.Options (Endpoint (..))
import Wire.API.Routes.FederationDomainConfig
import Wire.BackgroundWorker.Env hiding (federatorInternal, galley)
import Wire.BackgroundWorker.Env qualified as E
import Wire.BackgroundWorker.Options
Expand All @@ -20,17 +18,14 @@ testEnv = do
statuses <- newIORef mempty
backendNotificationMetrics <- mkBackendNotificationMetrics
httpManager <- newManager defaultManagerSettings
remoteDomains <- newIORef defFederationDomainConfigs
remoteDomainsChan <- newChan
notificationChannel <- newEmptyMVar
let federatorInternal = Endpoint "localhost" 0
rabbitmqAdminClient = undefined
rabbitmqVHost = undefined
metrics = undefined
galley = Endpoint "localhost" 8085
brig = Endpoint "localhost" 8082
defederationTimeout = responseTimeoutNone
backendNotificationsConfig = BackendNotificationsConfig 1000 500000
backendNotificationsConfig = BackendNotificationsConfig 1000 500000 (Just 0.5)
pure Env {..}

runTestAppT :: AppT IO a -> Int -> IO a
Expand Down