From f1810ebdeb48f24303a82024822f1c2c5f5c082c Mon Sep 17 00:00:00 2001 From: Matthias Fischmann Date: Wed, 23 Aug 2023 10:39:13 +0200 Subject: [PATCH 1/9] Changelog. --- .../WPB-3797-do-not-cache-federation-remote-domain-config | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5-internal/WPB-3797-do-not-cache-federation-remote-domain-config diff --git a/changelog.d/5-internal/WPB-3797-do-not-cache-federation-remote-domain-config b/changelog.d/5-internal/WPB-3797-do-not-cache-federation-remote-domain-config new file mode 100644 index 00000000000..dfd7ed0f27f --- /dev/null +++ b/changelog.d/5-internal/WPB-3797-do-not-cache-federation-remote-domain-config @@ -0,0 +1 @@ +Do not cache federation remote configs on non-brig services From 270c479707f4a54cf1ee45d1bafc39b94a97567e Mon Sep 17 00:00:00 2001 From: Matthias Fischmann Date: Wed, 23 Aug 2023 10:47:07 +0200 Subject: [PATCH 2/9] nit-pick. --- libs/wire-api/src/Wire/API/FederationUpdate.hs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libs/wire-api/src/Wire/API/FederationUpdate.hs b/libs/wire-api/src/Wire/API/FederationUpdate.hs index 0bafee435af..c21b2dd0ce2 100644 --- a/libs/wire-api/src/Wire/API/FederationUpdate.hs +++ b/libs/wire-api/src/Wire/API/FederationUpdate.hs @@ -34,8 +34,8 @@ syncFedDomainConfigs (Endpoint h p) log' cb = do updateDomainsThread <- async $ loop log' clientEnv cb ioref pure (ioref, updateDomainsThread) -deleteFedRemoteGalley :: Domain -> ClientM () -deleteFedRemoteGalley dom = namedClient @IAPI.API @"delete-federation-remote-from-galley" dom +deleteFederationRemoteGalley :: Domain -> ClientEnv -> IO (Either ClientError ()) +deleteFederationRemoteGalley dom = runClientM $ namedClient @IAPI.API @"delete-federation-remote-from-galley" dom -- | Initial function for getting the set of domains from brig, and an update interval initialize :: L.Logger -> ClientEnv -> IO FederationDomainConfigs @@ -56,9 +56,6 @@ initialize logger clientEnv = Just c -> pure c Nothing -> throwIO $ ErrorCall "*** Failed to reach brig for federation setup, giving up!" -deleteFederationRemoteGalley :: Domain -> ClientEnv -> IO (Either ClientError ()) -deleteFederationRemoteGalley dom = runClientM $ deleteFedRemoteGalley dom - loop :: L.Logger -> ClientEnv -> SyncFedDomainConfigsCallback -> IORef FederationDomainConfigs -> IO () loop logger clientEnv (SyncFedDomainConfigsCallback callback) env = forever $ catch go $ \(e :: SomeException) -> do From b78e3e463dd10829beaeefb602991663f1080cd6 Mon Sep 17 00:00:00 2001 From: Matthias Fischmann Date: Wed, 23 Aug 2023 10:49:12 +0200 Subject: [PATCH 3/9] Rip out fed remote domain caching from /libs/wire-api. --- .../wire-api/src/Wire/API/FederationUpdate.hs | 83 +------------------ 1 file changed, 3 insertions(+), 80 deletions(-) diff --git a/libs/wire-api/src/Wire/API/FederationUpdate.hs b/libs/wire-api/src/Wire/API/FederationUpdate.hs index c21b2dd0ce2..84c34ec507f 100644 --- a/libs/wire-api/src/Wire/API/FederationUpdate.hs +++ b/libs/wire-api/src/Wire/API/FederationUpdate.hs @@ -1,9 +1,6 @@ module Wire.API.FederationUpdate - ( syncFedDomainConfigs, - SyncFedDomainConfigsCallback (..), - emptySyncFedDomainConfigsCallback, + ( getFederationDomainConfigs, deleteFederationRemoteGalley, - fetch, ) where @@ -24,82 +21,8 @@ import Wire.API.Routes.FederationDomainConfig import Wire.API.Routes.Internal.Brig qualified as IAPI import Wire.API.Routes.Named (namedClient) --- | 'FedUpdateCallback' is not called if a new settings cannot be fetched, or if they are --- equal to the old settings. -syncFedDomainConfigs :: Endpoint -> L.Logger -> SyncFedDomainConfigsCallback -> IO (IORef FederationDomainConfigs, Async ()) -syncFedDomainConfigs (Endpoint h p) log' cb = do - let baseUrl = BaseUrl Http (unpack h) (fromIntegral p) "" - clientEnv <- newManager defaultManagerSettings <&> \mgr -> ClientEnv mgr baseUrl Nothing defaultMakeClientRequest - ioref <- newIORef =<< initialize log' clientEnv - updateDomainsThread <- async $ loop log' clientEnv cb ioref - pure (ioref, updateDomainsThread) +getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) +getFederationDomainConfigs = runClientM (namedClient @IAPI.API @"get-federation-remotes") deleteFederationRemoteGalley :: Domain -> ClientEnv -> IO (Either ClientError ()) deleteFederationRemoteGalley dom = runClientM $ namedClient @IAPI.API @"delete-federation-remote-from-galley" dom - --- | Initial function for getting the set of domains from brig, and an update interval -initialize :: L.Logger -> ClientEnv -> IO FederationDomainConfigs -initialize logger clientEnv = - let policy :: R.RetryPolicy - policy = R.capDelay 30_000_000 $ R.exponentialBackoff 3_000 - - go :: IO (Maybe FederationDomainConfigs) - go = do - fetch clientEnv >>= \case - Right s -> pure $ Just s - Left e -> do - L.log logger L.Info $ - L.msg (L.val "Failed to reach brig for federation setup, retrying...") - L.~~ "error" L..= show e - pure Nothing - in R.retrying policy (const (pure . isNothing)) (const go) >>= \case - Just c -> pure c - Nothing -> throwIO $ ErrorCall "*** Failed to reach brig for federation setup, giving up!" - -loop :: L.Logger -> ClientEnv -> SyncFedDomainConfigsCallback -> IORef FederationDomainConfigs -> IO () -loop logger clientEnv (SyncFedDomainConfigsCallback callback) env = forever $ - catch go $ \(e :: SomeException) -> do - -- log synchronous exceptions - case fromException e of - -- Rethrow async exceptions so that we can kill this thread with the `async` tools - -- The use of cast here comes from https://hackage.haskell.org/package/base-4.18.0.0/docs/src/GHC.IO.Exception.html#asyncExceptionFromException - -- But I only want to check for AsyncCancelled while leaving non-async exception - -- logging in place. - Just (SomeAsyncException e') -> case cast e' of - Just AsyncCancelled -> throwIO e - Nothing -> pure () - Nothing -> - L.log logger L.Error $ - L.msg (L.val "Federation domain sync thread died, restarting domain synchronization.") - L.~~ "error" L..= displayException e - where - go = do - fetch clientEnv >>= \case - Left e -> - L.log logger L.Info $ - L.msg (L.val "Could not retrieve an updated list of federation domains from Brig; I'll keep trying!") - L.~~ "error" L..= displayException e - Right new -> do - old <- readIORef env - unless (domainListsEqual old new) $ callback old new - atomicWriteIORef env new - delay <- updateInterval <$> readIORef env - threadDelay (delay * 1_000_000) - - domainListsEqual o n = - Set.fromList (domain <$> remotes o) - == Set.fromList (domain <$> remotes n) - -fetch :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) -fetch = runClientM (namedClient @IAPI.API @"get-federation-remotes") - --- | The callback takes the previous and the new settings and runs a given action. -newtype SyncFedDomainConfigsCallback = SyncFedDomainConfigsCallback - { fromFedUpdateCallback :: - FederationDomainConfigs -> -- old value - FederationDomainConfigs -> -- new value - IO () - } - -emptySyncFedDomainConfigsCallback :: SyncFedDomainConfigsCallback -emptySyncFedDomainConfigsCallback = SyncFedDomainConfigsCallback $ \_ _ -> pure () From b20adabc3c25ed17174f933b5be90a9729807a92 Mon Sep 17 00:00:00 2001 From: Matthias Fischmann Date: Wed, 23 Aug 2023 14:04:45 +0200 Subject: [PATCH 4/9] ... --- .../wire-api/src/Wire/API/FederationUpdate.hs | 14 ++----------- libs/wire-api/wire-api.cabal | 4 ---- services/federator/federator.cabal | 1 + services/federator/src/Federator/Env.hs | 2 -- services/federator/src/Federator/Response.hs | 20 ++++++++++++++++--- services/federator/src/Federator/Run.hs | 9 ++++----- 6 files changed, 24 insertions(+), 26 deletions(-) diff --git a/libs/wire-api/src/Wire/API/FederationUpdate.hs b/libs/wire-api/src/Wire/API/FederationUpdate.hs index 84c34ec507f..a61cff5a9fc 100644 --- a/libs/wire-api/src/Wire/API/FederationUpdate.hs +++ b/libs/wire-api/src/Wire/API/FederationUpdate.hs @@ -4,25 +4,15 @@ module Wire.API.FederationUpdate ) where -import Control.Concurrent.Async -import Control.Exception -import Control.Retry qualified as R import Data.Domain -import Data.Set qualified as Set -import Data.Text -import Data.Typeable (cast) import Imports -import Network.HTTP.Client (defaultManagerSettings, newManager) -import Servant.Client (BaseUrl (BaseUrl), ClientEnv (ClientEnv), ClientError, ClientM, Scheme (Http), runClientM) -import Servant.Client.Internal.HttpClient (defaultMakeClientRequest) -import System.Logger qualified as L -import Util.Options +import Servant.Client (ClientEnv, ClientError, runClientM) import Wire.API.Routes.FederationDomainConfig import Wire.API.Routes.Internal.Brig qualified as IAPI import Wire.API.Routes.Named (namedClient) getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) -getFederationDomainConfigs = runClientM (namedClient @IAPI.API @"get-federation-remotes") +getFederationDomainConfigs = runClientM $ namedClient @IAPI.API @"get-federation-remotes" deleteFederationRemoteGalley :: Domain -> ClientEnv -> IO (Either ClientError ()) deleteFederationRemoteGalley dom = runClientM $ namedClient @IAPI.API @"delete-federation-remote-from-galley" dom diff --git a/libs/wire-api/wire-api.cabal b/libs/wire-api/wire-api.cabal index 05cd5447b97..d503de1f934 100644 --- a/libs/wire-api/wire-api.cabal +++ b/libs/wire-api/wire-api.cabal @@ -221,7 +221,6 @@ library build-depends: aeson >=2.0.1.0 - , async , attoparsec >=0.10 , base >=4 && <5 , base64-bytestring >=1.0 @@ -255,7 +254,6 @@ library , hscim , HsOpenSSL , http-api-data - , http-client , http-media , http-types , imports @@ -277,7 +275,6 @@ library , quickcheck-instances >=0.3.16 , random >=1.2.0 , resourcet - , retry , saml2-web-sso , schema-profunctor , scientific @@ -297,7 +294,6 @@ library , tagged , text >=0.11 , time >=1.4 - , tinylog , transitive-anns , types-common >=0.16 , unordered-containers >=0.2 diff --git a/services/federator/federator.cabal b/services/federator/federator.cabal index e3e28a30089..0ced6146a6b 100644 --- a/services/federator/federator.cabal +++ b/services/federator/federator.cabal @@ -135,6 +135,7 @@ library , polysemy , polysemy-wire-zoo , servant + , servant-client , servant-client-core , servant-server , text diff --git a/services/federator/src/Federator/Env.hs b/services/federator/src/Federator/Env.hs index 52a581891a4..307d215da07 100644 --- a/services/federator/src/Federator/Env.hs +++ b/services/federator/src/Federator/Env.hs @@ -33,7 +33,6 @@ import OpenSSL.Session (SSLContext) import System.Logger.Class qualified as LC import Util.Options import Wire.API.Federation.Component -import Wire.API.Routes.FederationDomainConfig (FederationDomainConfigs) data Env = Env { _metrics :: Metrics, @@ -41,7 +40,6 @@ data Env = Env _requestId :: RequestId, _dnsResolver :: Resolver, _runSettings :: RunSettings, - _domainConfigs :: IORef FederationDomainConfigs, _service :: Component -> Endpoint, _externalPort :: Word16, _internalPort :: Word16, diff --git a/services/federator/src/Federator/Response.hs b/services/federator/src/Federator/Response.hs index 8c447cbc2fe..b17f576598a 100644 --- a/services/federator/src/Federator/Response.hs +++ b/services/federator/src/Federator/Response.hs @@ -53,10 +53,14 @@ import Polysemy.Input import Polysemy.Internal import Polysemy.TinyLog import Servant hiding (ServerError, respond, serve) +import Servant.Client (mkClientEnv) import Servant.Client.Core import Servant.Server.Generic import Servant.Types.SourceT -import Wire.API.Routes.FederationDomainConfig +import Util.Options (Endpoint (..)) +import Wire.API.FederationUpdate qualified as FedUp (getFederationDomainConfigs) +import Wire.API.MakesFederatedCall (Component (Brig)) +import Wire.API.Routes.FederationDomainConfig qualified as FedUp (FederationDomainConfigs) import Wire.Network.DNS.Effect import Wire.Sem.Logger.TinyLog @@ -143,7 +147,7 @@ type AllEffects = ServiceStreaming, Input RunSettings, Input Http2Manager, -- needed by Remote - Input FederationDomainConfigs, -- needed for the domain list. + Input FedUp.FederationDomainConfigs, -- needed for the domain list and federation policy. Input Env, -- needed by Service Error ValidationError, Error RemoteError, @@ -168,7 +172,7 @@ runFederator env = DiscoveryFailure ] . runInputConst env - . runInputSem (embed @IO (readIORef (view domainConfigs env))) + . runInputSem (embed @IO (getFederationDomainConfigs env)) . runInputSem (embed @IO (readIORef (view http2Manager env))) . runInputConst (view runSettings env) . interpretServiceHTTP @@ -176,6 +180,16 @@ runFederator env = . runFederatorDiscovery . interpretRemote +getFederationDomainConfigs :: Env -> IO FedUp.FederationDomainConfigs +getFederationDomainConfigs env = do + let mgr = env ^. httpManager + Endpoint h p = env ^. service $ Brig + baseurl = BaseUrl Http (cs h) (fromIntegral p) "" + clientEnv = mkClientEnv mgr baseurl + FedUp.getFederationDomainConfigs clientEnv >>= \case + Right v -> pure v + Left e -> error $ show e + streamingResponseToWai :: StreamingResponse -> Wai.Response streamingResponseToWai resp = let headers = toList (responseHeaders resp) diff --git a/services/federator/src/Federator/Run.hs b/services/federator/src/Federator/Run.hs index e3072294ec6..b229486f4cc 100644 --- a/services/federator/src/Federator/Run.hs +++ b/services/federator/src/Federator/Run.hs @@ -64,14 +64,13 @@ run opts = do let resolvConf = mkResolvConf (optSettings opts) DNS.defaultResolvConf DNS.withCachingResolver resolvConf $ \res -> do logger <- LogExt.mkLogger (Opt.logLevel opts) (Opt.logNetStrings opts) (Opt.logFormat opts) - (ioref, updateFedDomainsThread) <- syncFedDomainConfigs (brig opts) logger emptySyncFedDomainConfigsCallback - bracket (newEnv opts res logger ioref) closeEnv $ \env -> do + bracket (newEnv opts res logger) closeEnv $ \env -> do let externalServer = serveInward env portExternal internalServer = serveOutward env portInternal withMonitor logger (onNewSSLContext env) (optSettings opts) $ do internalServerThread <- async internalServer externalServerThread <- async externalServer - void $ waitAnyCancel [updateFedDomainsThread, internalServerThread, externalServerThread] + void $ waitAnyCancel [internalServerThread, externalServerThread] where endpointInternal = federatorInternal opts portInternal = fromIntegral $ endpointInternal ^. port @@ -91,8 +90,8 @@ run opts = do ------------------------------------------------------------------------------- -- Environment -newEnv :: Opts -> DNS.Resolver -> Log.Logger -> IORef FederationDomainConfigs -> IO Env -newEnv o _dnsResolver _applog _domainConfigs = do +newEnv :: Opts -> DNS.Resolver -> Log.Logger -> IO Env +newEnv o _dnsResolver _applog = do _metrics <- Metrics.metrics let _requestId = def _runSettings = Opt.optSettings o From c22d3ecc5f58bbf5bb20cc5142130b5578a95594 Mon Sep 17 00:00:00 2001 From: Owen Harvey Date: Tue, 26 Sep 2023 12:40:22 +1000 Subject: [PATCH 5/9] WPB-3797: More fixes after the develop merge --- .../background-worker/background-worker.cabal | 1 + services/background-worker/default.nix | 1 + .../src/Wire/BackendNotificationPusher.hs | 60 ++++++++----------- .../Wire/BackendNotificationPusherSpec.hs | 7 ++- 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 377e7487ae5..4676ccfb908 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -49,6 +49,7 @@ library , types-common , unliftio , wai-utilities + , wire-api , wire-api-federation default-extensions: diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 910b9a396dd..6037deec62c 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -70,6 +70,7 @@ mkDerivation { types-common unliftio wai-utilities + wire-api wire-api-federation ]; executableHaskellDepends = [ HsOpenSSL imports types-common ]; diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index 68114646b16..fa6e6d6ba4a 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -9,18 +9,22 @@ import Data.Aeson qualified as A import Data.Domain import Data.Map.Strict qualified as Map import Data.Set qualified as Set -import Data.Text qualified as Text +import Data.Text (unpack) import Imports import Network.AMQP (cancelConsumer) import Network.AMQP qualified as Q import Network.AMQP.Extended import Network.AMQP.Lifted qualified as QL -import Network.RabbitMqAdmin import Prometheus +import Servant.Client (BaseUrl (BaseUrl), ClientEnv, ClientError, Scheme (Http), mkClientEnv, runClientM) import System.Logger.Class qualified as Log import UnliftIO +import Util.Options (Endpoint (..)) import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client +import Wire.API.Routes.FederationDomainConfig (FederationDomainConfigs, domain, remotes, updateInterval) +import Wire.API.Routes.Internal.Brig qualified as IAPI +import Wire.API.Routes.Named (namedClient) import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util @@ -129,27 +133,17 @@ startPusher consumersRef chan = do [ Handler $ cleanup @SomeException, Handler $ cleanup @SomeAsyncException ] + $ forever $ do - -- Get an initial set of domains from the sync thread - -- The Chan that we will be waiting on isn't initialised with a - -- value until the domain update loop runs the callback for the - -- first time. - remotes <- getRemoteDomains - -- Get an initial set of consumers for the domains pulled from the IORef - -- so that we aren't just sitting around not doing anything for a bit at - -- the start. - ensureConsumers consumersRef chan remotes - -- Wait for updates to the domains, this is where the bulk of the action - -- is going to take place - forever $ do - liftIO $ threadDelay $ 60 * 1000 * 1000 -- TODO! - -- Wait for a new set of domains. This is a blocking action - -- so we will only move past here when we get a new set of domains. - -- It is a bit nicer than having another timeout value, as Brig is - -- already providing one in the domain update message. - newRemotes <- getRemoteDomains - -- Make new consumers for the new domains, clean up old ones from the consumer map. - ensureConsumers consumersRef chan newRemotes + -- Get a new set of domains. This is a blocking action + -- so we will only move past here when we get a new set of domains. + -- It is a bit nicer than having another timeout value, as Brig is + -- already providing one in the domain update message. + fedConfig <- getRemoteDomains + -- Make new consumers for the new domains, clean up old ones from the consumer map. + ensureConsumers consumersRef chan $ domain <$> remotes fedConfig + -- Wait the for as long as brig told us to + liftIO $ threadDelay $ updateInterval fedConfig * 1000 * 1000 -- TODO! ensureConsumers :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> [Domain] -> AppT IO () ensureConsumers consumers chan domains = do @@ -185,7 +179,10 @@ ensureConsumer consumers chan domain = do -- let us come down this path if there is an old consumer. liftIO $ forM_ oldTag $ Q.cancelConsumer chan . fst -getRemoteDomains :: AppT IO [Domain] +getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) +getFederationDomainConfigs = runClientM $ namedClient @IAPI.API @"get-federation-remotes" + +getRemoteDomains :: AppT IO FederationDomainConfigs getRemoteDomains = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max -- cumulative delay. When this is reached, the operation fails. @@ -203,18 +200,13 @@ getRemoteDomains = do <> [logRetries (const $ pure True) logErrr] recovering policy handlers $ const go where - go :: AppT IO [Domain] + go :: AppT IO FederationDomainConfigs go = do - client <- asks rabbitmqAdminClient - vhost <- asks rabbitmqVHost - queues <- liftIO $ listQueuesByVHost client vhost - let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues - catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes - logInvalidDomain d e = - Log.warn $ - Log.msg (Log.val "Found invalid domain in a backend notifications queue name") - . Log.field "queue" ("backend-notifications." <> d) - . Log.field "error" e + env <- ask + let Endpoint (unpack -> h) (fromIntegral -> p) = env.brig + clientEnv = mkClientEnv (httpManager env) $ BaseUrl Http h p "" + e <- liftIO $ getFederationDomainConfigs clientEnv + either throwM pure e startWorker :: RabbitMqAdminOpts -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ()))) startWorker rabbitmqOpts = do diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 383faf208bf..de5ee9d416b 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -41,6 +41,7 @@ import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Common import Wire.API.Federation.BackendNotifications import Wire.API.RawJson +import Wire.API.Routes.FederationDomainConfig import Wire.BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options @@ -192,7 +193,8 @@ spec = do backendNotificationsConfig = BackendNotificationsConfig 1000 500000 backendNotificationMetrics <- mkBackendNotificationMetrics - domains <- runAppT Env {..} getRemoteDomains + fedConfig <- runAppT Env {..} getRemoteDomains + let domains = domain <$> remotes fedConfig domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"] @@ -211,7 +213,8 @@ spec = do brig = Endpoint "localhost" 8082 backendNotificationsConfig = BackendNotificationsConfig 1000 500000 backendNotificationMetrics <- mkBackendNotificationMetrics - domainsThread <- async $ runAppT Env {..} getRemoteDomains + fedConfigThread <- async $ runAppT Env {..} getRemoteDomains + let domainsThread = fmap domain . remotes <$> fedConfigThread -- Wait for first call untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls) From 0b1bbe723f942b742e2a3b16be454a4ab4295114 Mon Sep 17 00:00:00 2001 From: Owen Harvey Date: Tue, 26 Sep 2023 14:25:46 +1000 Subject: [PATCH 6/9] WPB-3797: Fixing a test --- .../src/Wire/BackendNotificationPusher.hs | 36 ++++++++++++++++++- .../Wire/BackendNotificationPusherSpec.hs | 7 ++-- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index fa6e6d6ba4a..1995152d1c1 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -10,11 +10,13 @@ import Data.Domain import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Text (unpack) +import Data.Text qualified as Text import Imports import Network.AMQP (cancelConsumer) import Network.AMQP qualified as Q import Network.AMQP.Extended import Network.AMQP.Lifted qualified as QL +import Network.RabbitMqAdmin import Prometheus import Servant.Client (BaseUrl (BaseUrl), ClientEnv, ClientError, Scheme (Http), mkClientEnv, runClientM) import System.Logger.Class qualified as Log @@ -143,7 +145,7 @@ startPusher consumersRef chan = do -- Make new consumers for the new domains, clean up old ones from the consumer map. ensureConsumers consumersRef chan $ domain <$> remotes fedConfig -- Wait the for as long as brig told us to - liftIO $ threadDelay $ updateInterval fedConfig * 1000 * 1000 -- TODO! + liftIO $ threadDelay $ updateInterval fedConfig * 1000 * 1000 ensureConsumers :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> [Domain] -> AppT IO () ensureConsumers consumers chan domains = do @@ -182,6 +184,38 @@ ensureConsumer consumers chan domain = do getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) getFederationDomainConfigs = runClientM $ namedClient @IAPI.API @"get-federation-remotes" +-- This exists fro background-worker integration testing when Brig isn't available, but Rabbit is from the running services. +getRemoteDomainsFromRabbit :: AppT IO [Domain] +getRemoteDomainsFromRabbit = do + -- Jittered exponential backoff with 10ms as starting delay and 60s as max + -- cumulative delay. When this is reached, the operation fails. + -- + -- FUTUREWORK: Pull these numbers into config + let policy = limitRetriesByCumulativeDelay 60_000_000 $ fullJitterBackoff 10000 + logErrr willRetry (SomeException e) rs = + Log.err $ + Log.msg (Log.val "Exception occurred while refreshig domains") + . Log.field "error" (displayException e) + . Log.field "willRetry" willRetry + . Log.field "retryCount" rs.rsIterNumber + handlers = + skipAsyncExceptions + <> [logRetries (const $ pure True) logErrr] + recovering policy handlers $ const go + where + go :: AppT IO [Domain] + go = do + client <- asks rabbitmqAdminClient + vhost <- asks rabbitmqVHost + queues <- liftIO $ listQueuesByVHost client vhost + let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues + catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes + logInvalidDomain d e = + Log.warn $ + Log.msg (Log.val "Found invalid domain in a backend notifications queue name") + . Log.field "queue" ("backend-notifications." <> d) + . Log.field "error" e + getRemoteDomains :: AppT IO FederationDomainConfigs getRemoteDomains = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index de5ee9d416b..dec7f228201 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -41,7 +41,6 @@ import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Common import Wire.API.Federation.BackendNotifications import Wire.API.RawJson -import Wire.API.Routes.FederationDomainConfig import Wire.BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options @@ -193,8 +192,7 @@ spec = do backendNotificationsConfig = BackendNotificationsConfig 1000 500000 backendNotificationMetrics <- mkBackendNotificationMetrics - fedConfig <- runAppT Env {..} getRemoteDomains - let domains = domain <$> remotes fedConfig + domains <- runAppT Env {..} getRemoteDomainsFromRabbit domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"] @@ -213,8 +211,7 @@ spec = do brig = Endpoint "localhost" 8082 backendNotificationsConfig = BackendNotificationsConfig 1000 500000 backendNotificationMetrics <- mkBackendNotificationMetrics - fedConfigThread <- async $ runAppT Env {..} getRemoteDomains - let domainsThread = fmap domain . remotes <$> fedConfigThread + domainsThread <- async $ runAppT Env {..} getRemoteDomainsFromRabbit -- Wait for first call untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls) From 7a8bb825cd5f262d9985eb8c065efd3e79846c34 Mon Sep 17 00:00:00 2001 From: Owen Harvey Date: Wed, 27 Sep 2023 17:23:18 +1000 Subject: [PATCH 7/9] Reverting some changes to check tests --- charts/background-worker/values.yaml | 1 + hack/helm_vars/wire-server/values.yaml.gotmpl | 1 + integration/test/Testlib/ModService.hs | 2 +- integration/test/Testlib/ResourcePool.hs | 20 +++++++++++++--- integration/test/Testlib/Types.hs | 3 ++- .../background-worker.integration.yaml | 1 + .../src/Wire/BackendNotificationPusher.hs | 24 +++++++------------ .../src/Wire/BackgroundWorker/Options.hs | 5 +++- .../Wire/BackendNotificationPusherSpec.hs | 8 +++---- .../background-worker/test/Test/Wire/Util.hs | 2 +- 10 files changed, 41 insertions(+), 26 deletions(-) diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index fcae0115bfc..a7a552a4536 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -26,6 +26,7 @@ config: backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms pushBackoffMaxWait: 300000000 # microseconds, so 300s + remotesRefreshInterval: 300000000 # microseconds, so 300s serviceAccount: # When setting this to 'false', either make sure that a service account named diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index f74522dd262..7185c30b4f7 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -306,6 +306,7 @@ background-worker: backendNotificationPusher: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 500000 # 0.5s + remotesRefreshInterval: 1000000 # 1s secrets: rabbitmq: username: {{ .Values.rabbitmqUsername }} diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index b5e0b526004..5d6a444f770 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -172,7 +172,7 @@ startDynamicBackend resource beOverrides = do def { brigCfg = setField "optSettings.setFederationDomain" resource.berDomain - >=> setField "optSettings.setFederationDomainConfigs" ([] :: [Value]) + >=> setField "optSettings.setFederationDomainConfigs" resource.berFederationDomainConfigs >=> setField "federatorInternal.port" resource.berFederatorInternal >=> setField "federatorInternal.host" ("127.0.0.1" :: String) >=> setField "rabbitmq.vHost" resource.berVHost, diff --git a/integration/test/Testlib/ResourcePool.hs b/integration/test/Testlib/ResourcePool.hs index c7483ca9478..41ab4b75cbc 100644 --- a/integration/test/Testlib/ResourcePool.hs +++ b/integration/test/Testlib/ResourcePool.hs @@ -14,6 +14,7 @@ import Control.Concurrent import Control.Monad.Catch import Control.Monad.Codensity import Control.Monad.IO.Class +import Data.Aeson import Data.Foldable (for_) import Data.Function ((&)) import Data.Functor @@ -105,7 +106,8 @@ backendResources dynConfs = berVHost = dynConf.domain, berNginzSslPort = Ports.portForDyn Ports.NginzSSL i, berNginzHttp2Port = Ports.portForDyn Ports.NginzHttp2 i, - berInternalServicePorts = Ports.internalServicePorts name + berInternalServicePorts = Ports.internalServicePorts name, + berFederationDomainConfigs = [] } ) & Set.fromList @@ -136,7 +138,13 @@ backendA = berVHost = "backendA", berNginzSslPort = Ports.port Ports.NginzSSL BackendA, berInternalServicePorts = Ports.internalServicePorts BackendA, - berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendA + berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendA, + berFederationDomainConfigs = + [ object + [ fromString "domain" .= backendB.berDomain, + fromString "search_policy" .= "full_search" + ] + ] } backendB :: BackendResource @@ -165,5 +173,11 @@ backendB = berVHost = "backendB", berNginzSslPort = Ports.port Ports.NginzSSL BackendB, berInternalServicePorts = Ports.internalServicePorts BackendB, - berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendB + berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendB, + berFederationDomainConfigs = + [ object + [ fromString "domain" .= backendA.berDomain, + fromString "search_policy" .= "full_search" + ] + ] } diff --git a/integration/test/Testlib/Types.hs b/integration/test/Testlib/Types.hs index 268cc14f82a..b66ac629cf3 100644 --- a/integration/test/Testlib/Types.hs +++ b/integration/test/Testlib/Types.hs @@ -64,7 +64,8 @@ data BackendResource = BackendResource berVHost :: String, berNginzSslPort :: Word16, berNginzHttp2Port :: Word16, - berInternalServicePorts :: forall a. Num a => Service -> a + berInternalServicePorts :: forall a. Num a => Service -> a, + berFederationDomainConfigs :: [Value] } instance Eq BackendResource where diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index cc9a1b328b3..9f709cdae61 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -21,3 +21,4 @@ rabbitmq: backendNotificationPusher: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 1000000 # 1s + remotesRefreshInterval: 10000 # 10ms \ No newline at end of file diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index cf5327e3782..1f4e49848e8 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -23,7 +23,7 @@ import UnliftIO import Util.Options (Endpoint (..)) import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client -import Wire.API.Routes.FederationDomainConfig (FederationDomainConfigs, domain, remotes, updateInterval) +import Wire.API.Routes.FederationDomainConfig (FederationDomainConfigs) import Wire.API.Routes.Internal.Brig qualified as IAPI import Wire.API.Routes.Named (namedClient) import Wire.BackgroundWorker.Env @@ -124,6 +124,7 @@ startPusher consumersRef chan = do consumers <- liftIO $ readIORef consumersRef traverse_ (liftIO . Q.cancelConsumer chan . fst) $ Map.elems consumers throwM e + timeBeforeNextRefresh <- asks (.backendNotificationsConfig.remotesRefreshInterval) -- If this thread is cancelled, catch the exception, kill the consumers, and carry on. -- FUTUREWORK?: -- If this throws an exception on the Chan / in the forever loop, the exception will @@ -135,15 +136,9 @@ startPusher consumersRef chan = do ] $ forever $ do - -- Get a new set of domains. This is a blocking action - -- so we will only move past here when we get a new set of domains. - -- It is a bit nicer than having another timeout value, as Brig is - -- already providing one in the domain update message. - fedConfig <- getRemoteDomains - -- Make new consumers for the new domains, clean up old ones from the consumer map. - ensureConsumers consumersRef chan $ domain <$> remotes fedConfig - -- Wait the for as long as brig told us to - liftIO $ threadDelay $ updateInterval fedConfig * 1000 * 1000 + remotes <- getRemoteDomains + ensureConsumers consumersRef chan remotes + threadDelay timeBeforeNextRefresh ensureConsumers :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> [Domain] -> AppT IO () ensureConsumers consumers chan domains = do @@ -182,9 +177,8 @@ ensureConsumer consumers chan domain = do getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) getFederationDomainConfigs = runClientM $ namedClient @IAPI.API @"get-federation-remotes" --- This exists fro background-worker integration testing when Brig isn't available, but Rabbit is from the running services. -getRemoteDomainsFromRabbit :: AppT IO [Domain] -getRemoteDomainsFromRabbit = do +getRemoteDomains :: AppT IO [Domain] +getRemoteDomains = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max -- cumulative delay. When this is reached, the operation fails. -- @@ -214,8 +208,8 @@ getRemoteDomainsFromRabbit = do . Log.field "queue" ("backend-notifications." <> d) . Log.field "error" e -getRemoteDomains :: AppT IO FederationDomainConfigs -getRemoteDomains = do +getRemoteDomainsFromBrig :: AppT IO FederationDomainConfigs +getRemoteDomainsFromBrig = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max -- cumulative delay. When this is reached, the operation fails. -- diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index a3304e2eba9..056a57263b7 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -30,7 +30,10 @@ data BackendNotificationsConfig = BackendNotificationsConfig -- | Upper limit on amount of time (in microseconds) to wait before retrying -- any notification. This exists to ensure that exponential back-off doesn't -- cause wait times to be very big. - pushBackoffMaxWait :: Int + pushBackoffMaxWait :: Int, + -- | The list of remotes is refreshed at an interval. This value in + -- microseconds decides the interval for polling. + remotesRefreshInterval :: Int } deriving (Show, Generic) diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 966471401fb..31f1343cbbb 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -188,10 +188,10 @@ spec = do rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics - domains <- runAppT Env {..} getRemoteDomainsFromRabbit + domains <- runAppT Env {..} getRemoteDomains domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"] @@ -207,9 +207,9 @@ spec = do rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics - domainsThread <- async $ runAppT Env {..} getRemoteDomainsFromRabbit + domainsThread <- async $ runAppT Env {..} getRemoteDomains -- Wait for first call untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls) diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index 7736da11345..dc4bea0249e 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -24,7 +24,7 @@ testEnv = do rabbitmqVHost = undefined metrics = undefined defederationTimeout = responseTimeoutNone - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 pure Env {..} runTestAppT :: AppT IO a -> Int -> IO a From 05b12f3f6e1fd0944f30d450cec5af2eb0ff1bbc Mon Sep 17 00:00:00 2001 From: Owen Harvey Date: Wed, 27 Sep 2023 17:26:42 +1000 Subject: [PATCH 8/9] More reverts --- integration/test/Testlib/ModService.hs | 2 +- integration/test/Testlib/ResourcePool.hs | 20 +++----------------- integration/test/Testlib/Types.hs | 3 +-- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index 5d6a444f770..b5e0b526004 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -172,7 +172,7 @@ startDynamicBackend resource beOverrides = do def { brigCfg = setField "optSettings.setFederationDomain" resource.berDomain - >=> setField "optSettings.setFederationDomainConfigs" resource.berFederationDomainConfigs + >=> setField "optSettings.setFederationDomainConfigs" ([] :: [Value]) >=> setField "federatorInternal.port" resource.berFederatorInternal >=> setField "federatorInternal.host" ("127.0.0.1" :: String) >=> setField "rabbitmq.vHost" resource.berVHost, diff --git a/integration/test/Testlib/ResourcePool.hs b/integration/test/Testlib/ResourcePool.hs index 41ab4b75cbc..c7483ca9478 100644 --- a/integration/test/Testlib/ResourcePool.hs +++ b/integration/test/Testlib/ResourcePool.hs @@ -14,7 +14,6 @@ import Control.Concurrent import Control.Monad.Catch import Control.Monad.Codensity import Control.Monad.IO.Class -import Data.Aeson import Data.Foldable (for_) import Data.Function ((&)) import Data.Functor @@ -106,8 +105,7 @@ backendResources dynConfs = berVHost = dynConf.domain, berNginzSslPort = Ports.portForDyn Ports.NginzSSL i, berNginzHttp2Port = Ports.portForDyn Ports.NginzHttp2 i, - berInternalServicePorts = Ports.internalServicePorts name, - berFederationDomainConfigs = [] + berInternalServicePorts = Ports.internalServicePorts name } ) & Set.fromList @@ -138,13 +136,7 @@ backendA = berVHost = "backendA", berNginzSslPort = Ports.port Ports.NginzSSL BackendA, berInternalServicePorts = Ports.internalServicePorts BackendA, - berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendA, - berFederationDomainConfigs = - [ object - [ fromString "domain" .= backendB.berDomain, - fromString "search_policy" .= "full_search" - ] - ] + berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendA } backendB :: BackendResource @@ -173,11 +165,5 @@ backendB = berVHost = "backendB", berNginzSslPort = Ports.port Ports.NginzSSL BackendB, berInternalServicePorts = Ports.internalServicePorts BackendB, - berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendB, - berFederationDomainConfigs = - [ object - [ fromString "domain" .= backendA.berDomain, - fromString "search_policy" .= "full_search" - ] - ] + berNginzHttp2Port = Ports.port Ports.NginzHttp2 BackendB } diff --git a/integration/test/Testlib/Types.hs b/integration/test/Testlib/Types.hs index b66ac629cf3..268cc14f82a 100644 --- a/integration/test/Testlib/Types.hs +++ b/integration/test/Testlib/Types.hs @@ -64,8 +64,7 @@ data BackendResource = BackendResource berVHost :: String, berNginzSslPort :: Word16, berNginzHttp2Port :: Word16, - berInternalServicePorts :: forall a. Num a => Service -> a, - berFederationDomainConfigs :: [Value] + berInternalServicePorts :: forall a. Num a => Service -> a } instance Eq BackendResource where From 6ceb6a6e9a78d05ca202d14aaef083603d6f5c43 Mon Sep 17 00:00:00 2001 From: Owen Harvey Date: Wed, 27 Sep 2023 18:16:35 +1000 Subject: [PATCH 9/9] Yet more cleanup of unused things, post-reverts --- .../templates/configmap.yaml | 4 --- .../background-worker/background-worker.cabal | 1 - .../background-worker.integration.yaml | 4 --- services/background-worker/default.nix | 1 - .../src/Wire/BackendNotificationPusher.hs | 35 ------------------- .../src/Wire/BackgroundWorker/Env.hs | 2 -- .../src/Wire/BackgroundWorker/Options.hs | 1 - .../Wire/BackendNotificationPusherSpec.hs | 2 -- .../background-worker/test/Test/Wire/Util.hs | 1 - 9 files changed, 51 deletions(-) diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index c7e739db243..1a03ad0d5e4 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -20,10 +20,6 @@ data: federatorInternal: host: federator port: 8080 - - brig: - host: brig - port: 8080 rabbitmq: {{toYaml .rabbitmq | indent 6 }} diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 4676ccfb908..377e7487ae5 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -49,7 +49,6 @@ library , types-common , unliftio , wai-utilities - , wire-api , wire-api-federation default-extensions: diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index 9f709cdae61..32ff94e37ef 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -8,10 +8,6 @@ federatorInternal: host: 127.0.0.1 port: 8097 -brig: - host: 127.0.0.1 - port: 8082 - rabbitmq: host: 127.0.0.1 port: 5672 diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 6037deec62c..910b9a396dd 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -70,7 +70,6 @@ mkDerivation { types-common unliftio wai-utilities - wire-api wire-api-federation ]; executableHaskellDepends = [ HsOpenSSL imports types-common ]; diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index 1f4e49848e8..3bcceafac4c 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -9,7 +9,6 @@ import Data.Aeson qualified as A import Data.Domain import Data.Map.Strict qualified as Map import Data.Set qualified as Set -import Data.Text (unpack) import Data.Text qualified as Text import Imports import Network.AMQP qualified as Q @@ -17,15 +16,10 @@ import Network.AMQP.Extended import Network.AMQP.Lifted qualified as QL import Network.RabbitMqAdmin import Prometheus -import Servant.Client (BaseUrl (BaseUrl), ClientEnv, ClientError, Scheme (Http), mkClientEnv, runClientM) import System.Logger.Class qualified as Log import UnliftIO -import Util.Options (Endpoint (..)) import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client -import Wire.API.Routes.FederationDomainConfig (FederationDomainConfigs) -import Wire.API.Routes.Internal.Brig qualified as IAPI -import Wire.API.Routes.Named (namedClient) import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util @@ -174,9 +168,6 @@ ensureConsumer consumers chan domain = do -- let us come down this path if there is an old consumer. liftIO $ forM_ oldTag $ Q.cancelConsumer chan . fst -getFederationDomainConfigs :: ClientEnv -> IO (Either ClientError FederationDomainConfigs) -getFederationDomainConfigs = runClientM $ namedClient @IAPI.API @"get-federation-remotes" - getRemoteDomains :: AppT IO [Domain] getRemoteDomains = do -- Jittered exponential backoff with 10ms as starting delay and 60s as max @@ -208,32 +199,6 @@ getRemoteDomains = do . Log.field "queue" ("backend-notifications." <> d) . Log.field "error" e -getRemoteDomainsFromBrig :: AppT IO FederationDomainConfigs -getRemoteDomainsFromBrig = do - -- Jittered exponential backoff with 10ms as starting delay and 60s as max - -- cumulative delay. When this is reached, the operation fails. - -- - -- FUTUREWORK: Pull these numbers into config - let policy = limitRetriesByCumulativeDelay 60_000_000 $ fullJitterBackoff 10000 - logErrr willRetry (SomeException e) rs = - Log.err $ - Log.msg (Log.val "Exception occurred while refreshig domains") - . Log.field "error" (displayException e) - . Log.field "willRetry" willRetry - . Log.field "retryCount" rs.rsIterNumber - handlers = - skipAsyncExceptions - <> [logRetries (const $ pure True) logErrr] - recovering policy handlers $ const go - where - go :: AppT IO FederationDomainConfigs - go = do - env <- ask - let Endpoint (unpack -> h) (fromIntegral -> p) = env.brig - clientEnv = mkClientEnv (httpManager env) $ BaseUrl Http h p "" - e <- liftIO $ getFederationDomainConfigs clientEnv - either throwM pure e - startWorker :: RabbitMqAdminOpts -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ()))) startWorker rabbitmqOpts = do env <- ask diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 43f46fdb813..0d3080595f6 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -37,7 +37,6 @@ data Env = Env logger :: Logger, metrics :: Metrics.Metrics, federatorInternal :: Endpoint, - brig :: Endpoint, httpManager :: Manager, defederationTimeout :: ResponseTimeout, backendNotificationMetrics :: BackendNotificationMetrics, @@ -64,7 +63,6 @@ mkEnv opts = do logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat httpManager <- newManager defaultManagerSettings let federatorInternal = opts.federatorInternal - brig = opts.brig defederationTimeout = maybe responseTimeoutNone diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 056a57263b7..da31c41255a 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -11,7 +11,6 @@ data Opts = Opts logFormat :: !(Maybe (Last LogFormat)), backgroundWorker :: !Endpoint, federatorInternal :: !Endpoint, - brig :: !Endpoint, rabbitmq :: !RabbitMqAdminOpts, -- | Seconds, Nothing for no timeout defederationTimeout :: Maybe Int, diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 31f1343cbbb..243eb3d864b 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -181,7 +181,6 @@ spec = do logger <- Logger.new Logger.defSettings httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 8097 - brig = Endpoint "localhost" 8082 http2Manager = undefined statuses = undefined metrics = undefined @@ -200,7 +199,6 @@ spec = do logger <- Logger.new Logger.defSettings httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 8097 - brig = Endpoint "localhost" 8082 http2Manager = undefined statuses = undefined metrics = undefined diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index dc4bea0249e..ba698cccc2b 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -19,7 +19,6 @@ testEnv = do backendNotificationMetrics <- mkBackendNotificationMetrics httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 0 - brig = Endpoint "localhost" 8082 rabbitmqAdminClient = undefined rabbitmqVHost = undefined metrics = undefined