diff --git a/changelog.d/5-internal/integration-proper-cleanup b/changelog.d/5-internal/integration-proper-cleanup new file mode 100644 index 0000000000..79621ae8c3 --- /dev/null +++ b/changelog.d/5-internal/integration-proper-cleanup @@ -0,0 +1 @@ +Simplify process spawning of dynamic backends in integration tests diff --git a/integration/test/Testlib/Env.hs b/integration/test/Testlib/Env.hs index 6f82c901c7..2d98764ecf 100644 --- a/integration/test/Testlib/Env.hs +++ b/integration/test/Testlib/Env.hs @@ -60,23 +60,26 @@ mkGlobalEnv cfgFile = do & Cassandra.setContacts intConfig.cassandra.host [] & Cassandra.setPortNumber (fromIntegral intConfig.cassandra.port) cassClient <- Cassandra.init cassSettings + let resources = backendResources (Map.elems intConfig.dynamicBackends) resourcePool <- liftIO $ createBackendResourcePool - (Map.elems intConfig.dynamicBackends) + resources intConfig.rabbitmq cassClient + let sm = + Map.fromList $ + [ (intConfig.backendOne.originDomain, intConfig.backendOne.beServiceMap), + (intConfig.backendTwo.originDomain, intConfig.backendTwo.beServiceMap) + ] + <> [(berDomain resource, resourceServiceMap resource) | resource <- resources] tempDir <- Codensity $ withSystemTempDirectory "test" timeOutSeconds <- liftIO $ fromMaybe 10 . (readMaybe @Int =<<) <$> (lookupEnv "TEST_TIMEOUT_SECONDS") pure GlobalEnv - { gServiceMap = - Map.fromList - [ (intConfig.backendOne.originDomain, intConfig.backendOne.beServiceMap), - (intConfig.backendTwo.originDomain, intConfig.backendTwo.beServiceMap) - ], + { gServiceMap = sm, gDomain1 = intConfig.backendOne.originDomain, gDomain2 = intConfig.backendTwo.originDomain, gDynamicDomains = (.domain) <$> Map.elems intConfig.dynamicBackends, diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index e271bb3575..09c2b23c30 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -10,7 +10,6 @@ where import Control.Concurrent import Control.Concurrent.Async -import Control.Exception (finally) import Control.Exception qualified as E import Control.Monad.Catch (catch, throwM) import Control.Monad.Codensity @@ -22,7 +21,6 @@ import Data.Default import Data.Foldable import Data.Function import Data.Functor -import Data.Map.Strict qualified as Map import Data.Maybe import Data.Monoid import Data.String @@ -30,7 +28,6 @@ import Data.String.Conversions (cs) import Data.Text qualified as Text import Data.Text.IO qualified as Text import Data.Traversable -import Data.Word (Word16) import Data.Yaml qualified as Yaml import GHC.Stack import Network.HTTP.Client qualified as HTTP @@ -38,9 +35,8 @@ import System.Directory (copyFile, createDirectoryIfMissing, doesDirectoryExist, import System.FilePath import System.IO import System.IO.Temp (createTempDirectory, writeTempFile) -import System.Posix (killProcess, signalProcess) -import System.Process (CreateProcess (..), ProcessHandle, StdStream (..), createProcess, getPid, proc, terminateProcess, waitForProcess) -import System.Timeout (timeout) +import System.Posix (keyboardSignal, killProcess, signalProcess) +import System.Process import Testlib.App import Testlib.HTTP import Testlib.JSON @@ -67,17 +63,12 @@ copyDirectoryRecursively from to = do else copyFile fromPath toPath -- | Concurrent traverse in the 'Codensity App' monad. --- --- Every action is assumed to return an environment modification function. All --- actions are started concurrently, and once they all yield control to their --- continuation, the main continuation is run in an environment that --- accumulates all the individual environment changes. traverseConcurrentlyCodensity :: - (HasCallStack => a -> Codensity App (Env -> Env)) -> - (HasCallStack => [a] -> Codensity App (Env -> Env)) + (HasCallStack => a -> Codensity App ()) -> + (HasCallStack => [a] -> Codensity App ()) traverseConcurrentlyCodensity f args = do -- Create variables for synchronisation of the various threads: - -- * @result@ is used to store the environment change, or possibly an exception + -- * @result@ is used to store a possible exception -- * @done@ is used to signal that the main continuation has finished, so -- the thread can resume and move on to the cleanup phase. -- There is one pair of @(result, done)@ variables for each thread. @@ -90,12 +81,12 @@ traverseConcurrentlyCodensity f args = do -- two variables. This arrow will later be used to spawn a thread. runAction <- lift $ appToIOKleisli $ \((result, done), arg) -> catch - ( runCodensity (f arg) $ \a -> liftIO $ do - putMVar result (Right a) + ( runCodensity (f arg) $ \_ -> liftIO $ do + putMVar result Nothing takeMVar done ) $ \(e :: E.SomeException) -> - void . liftIO $ tryPutMVar result (Left e) + void . liftIO $ tryPutMVar result (Just e) -- Spawn threads. Here we use the fact that 'withAsync' implicitly returns a -- 'Codensity' action, and use the 'Monad' instance of 'Codensity' to @@ -107,17 +98,14 @@ traverseConcurrentlyCodensity f args = do k' <- appToIOKleisli k liftIO $ withAsync (runAction x) k' - -- Wait for all the threads to return environment changes in their result - -- variables. Any exception is rethrown here, and aborts the overall function. - fs <- liftIO $ for vars $ \(result, _) -> - takeMVar result >>= either throwM pure + -- Wait for all the threads set their result variables. Any exception is + -- rethrown here, and aborts the overall function. + liftIO $ for_ vars $ \(result, _) -> + takeMVar result >>= maybe (pure ()) throwM Codensity $ \k -> do - -- Now compose all environment changes in some arbitrary order (they - -- should mostly commute, anyway). Then apply these changes to the actual - -- environment and also pass it to the main continuation @k@. - let modifyEnv = appEndo (foldMap Endo fs) - result <- local modifyEnv (k modifyEnv) + -- Now run the main continuation. + result <- k () -- Finally, signal all threads that it is time to clean up, and wait for -- them to finish. Note that this last block might not be executed in case @@ -126,21 +114,21 @@ traverseConcurrentlyCodensity f args = do -- cancelled in that case. liftIO $ traverse_ (\(_, d) -> putMVar d ()) vars liftIO $ traverse_ wait asyncs + pure result startDynamicBackends :: HasCallStack => [ServiceOverrides] -> (HasCallStack => [String] -> App a) -> App a startDynamicBackends beOverrides k = runCodensity - ( do - when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported." - pool <- asks (.resourcePool) - resources <- acquireResources (Prelude.length beOverrides) pool - void $ traverseConcurrentlyCodensity (uncurry startDynamicBackend) (zip resources beOverrides) - pure $ map (.berDomain) resources - ) + do + when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported." + pool <- asks (.resourcePool) + resources <- acquireResources (Prelude.length beOverrides) pool + void $ traverseConcurrentlyCodensity (uncurry startDynamicBackend) (zip resources beOverrides) + pure $ map (.berDomain) resources k -startDynamicBackend :: HasCallStack => BackendResource -> ServiceOverrides -> Codensity App (Env -> Env) +startDynamicBackend :: HasCallStack => BackendResource -> ServiceOverrides -> Codensity App () startDynamicBackend resource beOverrides = do let overrides = mconcat @@ -151,7 +139,7 @@ startDynamicBackend resource beOverrides = do setLogLevel, beOverrides ] - startBackend resource overrides allServices + startBackend resource overrides where setAwsConfigs :: ServiceOverrides setAwsConfigs = @@ -227,152 +215,72 @@ startDynamicBackend resource beOverrides = do federatorInternalCfg = setField "logLevel" ("Warn" :: String) } +updateServiceMapInConfig :: BackendResource -> Service -> Value -> App Value +updateServiceMapInConfig resource forSrv config = + foldlM + ( \c (srv, port) -> do + overridden <- + c + & setField + (serviceName srv) + ( object + ( [ "host" .= ("127.0.0.1" :: String), + "port" .= port + ] + <> (["externalHost" .= ("127.0.0.1" :: String) | srv == Cannon]) + ) + ) + case (srv, forSrv) of + (Spar, Spar) -> do + overridden + -- FUTUREWORK: override "saml.spAppUri" and "saml.spSsoUri" with correct port, too? + & setField "saml.spHost" ("127.0.0.1" :: String) + & setField "saml.spPort" port + _ -> pure overridden + ) + config + [(srv, berInternalServicePorts resource srv :: Int) | srv <- allServices] + startBackend :: HasCallStack => BackendResource -> ServiceOverrides -> - [Service] -> - Codensity App (Env -> Env) -startBackend resource overrides services = do - let domain = resource.berDomain - - let updateServiceMapInConfig :: Service -> Value -> App Value - updateServiceMapInConfig forSrv config = - foldlM - ( \c (srv, port) -> do - overridden <- - c - & setField - (serviceName srv) - ( object - ( [ "host" .= ("127.0.0.1" :: String), - "port" .= port - ] - <> (["externalHost" .= ("127.0.0.1" :: String) | srv == Cannon]) - ) - ) - case (srv, forSrv) of - (Spar, Spar) -> do - overridden - -- FUTUREWORK: override "saml.spAppUri" and "saml.spSsoUri" with correct port, too? - & setField "saml.spHost" ("127.0.0.1" :: String) - & setField "saml.spPort" port - _ -> pure overridden - ) - config - [(srv, berInternalServicePorts resource srv :: Int) | srv <- services] - - let serviceMap = - let g srv = HostPort "127.0.0.1" (berInternalServicePorts resource srv) - in ServiceMap - { brig = g Brig, - backgroundWorker = g BackgroundWorker, - cannon = g Cannon, - cargohold = g Cargohold, - federatorInternal = g FederatorInternal, - federatorExternal = HostPort "127.0.0.1" resource.berFederatorExternal, - galley = g Galley, - gundeck = g Gundeck, - nginz = g Nginz, - spar = g Spar, - -- FUTUREWORK: Set to g Proxy, when we add Proxy to spawned services - proxy = HostPort "127.0.0.1" 9087, - stern = g Stern - } - - instances <- lift $ do - for services $ \case - Nginz -> do - env <- ask - case env.servicesCwdBase of - Nothing -> startNginzK8s domain serviceMap - Just _ -> startNginzLocal domain resource.berNginzHttp2Port resource.berNginzSslPort serviceMap - srv -> do - readServiceConfig srv - >>= updateServiceMapInConfig srv - >>= lookupConfigOverride overrides srv - >>= startProcess domain srv - - let stopInstances = liftIO $ do - -- Running waitForProcess would hang for 30 seconds when the test suite - -- is run from within ghci, so we don't wait here. - for_ instances $ \(ph, path) -> do - terminateProcess ph - timeout 50000 (waitForProcess ph) >>= \case - Just _ -> pure () - Nothing -> do - timeout 100000 (waitForProcess ph) >>= \case - Just _ -> pure () - Nothing -> do - mPid <- getPid ph - for_ mPid (signalProcess killProcess) - void $ waitForProcess ph - whenM (doesFileExist path) $ removeFile path - whenM (doesDirectoryExist path) $ removeDirectoryRecursive path - - let modifyEnv env = env {serviceMap = Map.insert resource.berDomain serviceMap env.serviceMap} - checkServiceIsUp = \case - Nginz -> pure True - srv -> do - req <- baseRequest domain srv Unversioned "/i/status" - checkStatus <- appToIO $ do - res <- submit "GET" req - pure (res.status `elem` [200, 204]) - eith <- liftIO (E.try checkStatus) - pure $ either (\(_e :: HTTP.HttpException) -> False) id eith - - Codensity $ \action -> local modifyEnv $ do - waitForService <- - appToIOKleisli - ( \srv -> - retryRequestUntil - (checkServiceIsUp srv) - (show srv) - ) - ioAction <- appToIO (action ()) - ioEnsureReachable <- appToIO (ensureReachable resource.berDomain) - liftIO $ - ( mapConcurrently_ waitForService services - >> ioEnsureReachable - >> ioAction - ) - `finally` stopInstances - - pure modifyEnv - where - ensureReachable :: String -> App () - ensureReachable domain = do - env <- ask - let checkServiceIsUpReq = do - req <- - rawBaseRequest - env.domain1 - FederatorInternal - Unversioned - ("/rpc/" <> domain <> "/brig/api-version") - <&> (addHeader "Wire-Origin-Domain" env.domain1) - . (addJSONObject []) - checkStatus <- appToIO $ do - res <- submit "POST" req - -- If we get 533 here it means federation is not available between domains - -- but ingress is working, since we're processing the request. - let is200 = res.status == 200 - mInner <- lookupField res.json "inner" - isFedDenied <- case mInner of - Nothing -> pure False - Just inner -> do - label <- inner %. "label" & asString - pure $ res.status == 533 && label == "federation-denied" - - pure (is200 || isFedDenied) - eith <- liftIO (E.try checkStatus) - pure $ either (\(_e :: HTTP.HttpException) -> False) id eith - - when ((domain /= env.domain1) && (domain /= env.domain2)) $ do - retryRequestUntil checkServiceIsUpReq "Federator ingress" - -startProcess :: String -> Service -> Value -> App (ProcessHandle, FilePath) -startProcess domain srv = startProcess' domain (configName srv) + Codensity App () +startBackend resource overrides = do + traverseConcurrentlyCodensity (withProcess resource overrides) allServices + lift $ ensureBackendReachable resource.berDomain + +ensureBackendReachable :: String -> App () +ensureBackendReachable domain = do + env <- ask + let checkServiceIsUpReq = do + req <- + rawBaseRequest + env.domain1 + FederatorInternal + Unversioned + ("/rpc/" <> domain <> "/brig/api-version") + <&> (addHeader "Wire-Origin-Domain" env.domain1) + . (addJSONObject []) + checkStatus <- appToIO $ do + res <- submit "POST" req + + -- If we get 533 here it means federation is not available between domains + -- but ingress is working, since we're processing the request. + let is200 = res.status == 200 + mInner <- lookupField res.json "inner" + isFedDenied <- case mInner of + Nothing -> pure False + Just inner -> do + label <- inner %. "label" & asString + pure $ res.status == 533 && label == "federation-denied" + + pure (is200 || isFedDenied) + eith <- liftIO (E.try checkStatus) + pure $ either (\(_e :: HTTP.HttpException) -> False) id eith + + when ((domain /= env.domain1) && (domain /= env.domain2)) $ do + retryRequestUntil checkServiceIsUpReq "Federator ingress" processColors :: [(String, String -> String)] processColors = @@ -387,31 +295,90 @@ processColors = ("nginx", colored purpleish) ] -startProcess' :: String -> String -> Value -> App (ProcessHandle, FilePath) -startProcess' domain execName config = do - tempFile <- liftIO $ writeTempFile "/tmp" (execName <> "-" <> domain <> "-" <> ".yaml") (cs $ Yaml.encode config) - +data ServiceInstance = ServiceInstance + { handle :: ProcessHandle, + config :: FilePath + } + +timeout :: Int -> IO a -> IO (Maybe a) +timeout usecs action = either (const Nothing) Just <$> race (threadDelay usecs) action + +cleanupService :: ServiceInstance -> IO () +cleanupService inst = do + let ignoreExceptions action = E.catch action $ \(_ :: E.SomeException) -> pure () + ignoreExceptions $ do + mPid <- getPid inst.handle + for_ mPid (signalProcess keyboardSignal) + timeout 50000 (waitForProcess inst.handle) >>= \case + Just _ -> pure () + Nothing -> do + for_ mPid (signalProcess killProcess) + void $ waitForProcess inst.handle + whenM (doesFileExist inst.config) $ removeFile inst.config + whenM (doesDirectoryExist inst.config) $ removeDirectoryRecursive inst.config + +-- | Wait for a service to come up. +waitUntilServiceIsUp :: String -> Service -> App () +waitUntilServiceIsUp domain srv = + retryRequestUntil + (checkServiceIsUp domain srv) + (show srv) + +-- | Check if a service is up and running. +checkServiceIsUp :: String -> Service -> App Bool +checkServiceIsUp _ Nginz = pure True +checkServiceIsUp domain srv = do + req <- baseRequest domain srv Unversioned "/i/status" + checkStatus <- appToIO $ do + res <- submit "GET" req + pure (res.status `elem` [200, 204]) + eith <- liftIO (E.try checkStatus) + pure $ either (\(_e :: HTTP.HttpException) -> False) id eith + +withProcess :: BackendResource -> ServiceOverrides -> Service -> Codensity App () +withProcess resource overrides service = do + let domain = berDomain resource + sm <- lift $ getServiceMap domain + getConfig <- + lift . appToIO $ + readServiceConfig service + >>= updateServiceMapInConfig resource service + >>= lookupConfigOverride overrides service + let execName = configName service (cwd, exe) <- - asks \env -> case env.servicesCwdBase of + lift $ asks \env -> case env.servicesCwdBase of Nothing -> (Nothing, execName) Just dir -> (Just (dir execName), "../../dist" execName) - (_, Just stdoutHdl, Just stderrHdl, ph) <- liftIO $ createProcess (proc exe ["-c", tempFile]) {cwd = cwd, std_out = CreatePipe, std_err = CreatePipe} - let prefix = "[" <> execName <> "@" <> domain <> "] " - let colorize = fromMaybe id (lookup execName processColors) - void $ liftIO $ forkIO $ logToConsole colorize prefix stdoutHdl - void $ liftIO $ forkIO $ logToConsole colorize prefix stderrHdl - pure (ph, tempFile) + startNginzLocalIO <- lift $ appToIO $ startNginzLocal resource + + let initProcess = case (service, cwd) of + (Nginz, Nothing) -> startNginzK8s domain sm + (Nginz, Just _) -> startNginzLocalIO + _ -> do + config <- getConfig + tempFile <- writeTempFile "/tmp" (execName <> "-" <> domain <> "-" <> ".yaml") (cs $ Yaml.encode config) + (_, Just stdoutHdl, Just stderrHdl, ph) <- createProcess (proc exe ["-c", tempFile]) {cwd = cwd, std_out = CreatePipe, std_err = CreatePipe} + let prefix = "[" <> execName <> "@" <> domain <> "] " + let colorize = fromMaybe id (lookup execName processColors) + void $ forkIO $ logToConsole colorize prefix stdoutHdl + void $ forkIO $ logToConsole colorize prefix stderrHdl + pure $ ServiceInstance ph tempFile + + void $ Codensity $ \k -> do + iok <- appToIOKleisli k + liftIO $ E.bracket initProcess cleanupService iok + + lift $ waitUntilServiceIsUp domain service logToConsole :: (String -> String) -> String -> Handle -> IO () logToConsole colorize prefix hdl = do let go = - ( do - line <- hGetLine hdl - putStrLn (colorize (prefix <> line)) - go - ) + do + line <- hGetLine hdl + putStrLn (colorize (prefix <> line)) + go `E.catch` (\(_ :: E.IOException) -> pure ()) go @@ -425,7 +392,7 @@ retryRequestUntil reqAction err = do unless isUp $ failApp ("Timed out waiting for service " <> err <> " to come up") -startNginzK8s :: String -> ServiceMap -> App (ProcessHandle, FilePath) +startNginzK8s :: String -> ServiceMap -> IO ServiceInstance startNginzK8s domain sm = do tmpDir <- liftIO $ createTempDirectory "/tmp" ("nginz" <> "-" <> domain) liftIO $ @@ -446,10 +413,15 @@ startNginzK8s domain sm = do ) createUpstreamsCfg upstreamsCfg sm ph <- startNginz domain nginxConfFile "/" - pure (ph, tmpDir) + pure $ ServiceInstance ph tmpDir + +startNginzLocal :: BackendResource -> App ServiceInstance +startNginzLocal resource = do + let domain = berDomain resource + http2Port = berNginzHttp2Port resource + sslPort = berNginzSslPort resource + sm <- getServiceMap domain -startNginzLocal :: String -> Word16 -> Word16 -> ServiceMap -> App (ProcessHandle, FilePath) -startNginzLocal domain http2Port sslPort sm = do -- Create a whole temporary directory and copy all nginx's config files. -- This is necessary because nginx assumes local imports are relative to -- the location of the main configuration file. @@ -492,7 +464,7 @@ listen [::]:{ssl_port} ssl http2; -- override upstreams let upstreamsCfg = tmpDir "conf" "nginz" "upstreams" - createUpstreamsCfg upstreamsCfg sm + liftIO $ createUpstreamsCfg upstreamsCfg sm let upstreamFederatorTemplate = [r|upstream {name} { server 127.0.0.1:{port} max_fails=3 weight=1; @@ -514,12 +486,12 @@ server 127.0.0.1:{port} max_fails=3 weight=1; writeFile pidConfigFile (cs $ "pid " <> pid <> ";") -- start service - ph <- startNginz domain nginxConfFile tmpDir + ph <- liftIO $ startNginz domain nginxConfFile tmpDir -- return handle and nginx tmp dir path - pure (ph, tmpDir) + pure $ ServiceInstance ph tmpDir -createUpstreamsCfg :: String -> ServiceMap -> App () +createUpstreamsCfg :: String -> ServiceMap -> IO () createUpstreamsCfg upstreamsCfg sm = do liftIO $ whenM (doesFileExist upstreamsCfg) $ removeFile upstreamsCfg let upstreamTemplate = @@ -548,21 +520,19 @@ server 127.0.0.1:{port} max_fails=3 weight=1; & Text.replace "{port}" (cs $ show p) liftIO $ appendFile upstreamsCfg (cs upstream) -startNginz :: String -> FilePath -> FilePath -> App ProcessHandle +startNginz :: String -> FilePath -> FilePath -> IO ProcessHandle startNginz domain conf workingDir = do (_, Just stdoutHdl, Just stderrHdl, ph) <- - liftIO $ - createProcess - (proc "nginx" ["-c", conf, "-g", "daemon off;", "-e", "/dev/stdout"]) - { cwd = Just workingDir, - std_out = CreatePipe, - std_err = CreatePipe - } + createProcess + (proc "nginx" ["-c", conf, "-g", "daemon off;", "-e", "/dev/stdout"]) + { cwd = Just workingDir, + std_out = CreatePipe, + std_err = CreatePipe + } let prefix = "[" <> "nginz" <> "@" <> domain <> "] " let colorize = fromMaybe id (lookup "nginx" processColors) - void $ liftIO $ forkIO $ logToConsole colorize prefix stdoutHdl - void $ liftIO $ forkIO $ logToConsole colorize prefix stderrHdl + void $ forkIO $ logToConsole colorize prefix stdoutHdl + void $ forkIO $ logToConsole colorize prefix stderrHdl - -- return handle and nginx tmp dir path pure ph diff --git a/integration/test/Testlib/ResourcePool.hs b/integration/test/Testlib/ResourcePool.hs index 788c3c5dda..de35ea2d70 100644 --- a/integration/test/Testlib/ResourcePool.hs +++ b/integration/test/Testlib/ResourcePool.hs @@ -2,6 +2,8 @@ module Testlib.ResourcePool ( ResourcePool, BackendResource (..), DynamicBackendConfig (..), + resourceServiceMap, + backendResources, createBackendResourcePool, acquireResources, backendA, @@ -14,7 +16,6 @@ import Control.Monad.Catch import Control.Monad.Codensity import Control.Monad.IO.Class import Data.Foldable (for_) -import Data.Function ((&)) import Data.Functor import Data.IORef import Data.Set qualified as Set @@ -30,6 +31,25 @@ import Testlib.Ports qualified as Ports import Testlib.Types import Prelude +resourceServiceMap :: BackendResource -> ServiceMap +resourceServiceMap resource = + let g srv = HostPort "127.0.0.1" (berInternalServicePorts resource srv) + in ServiceMap + { brig = g Brig, + backgroundWorker = g BackgroundWorker, + cannon = g Cannon, + cargohold = g Cargohold, + federatorInternal = g FederatorInternal, + federatorExternal = HostPort "127.0.0.1" resource.berFederatorExternal, + galley = g Galley, + gundeck = g Gundeck, + nginz = g Nginz, + spar = g Spar, + -- FUTUREWORK: Set to g Proxy, when we add Proxy to spawned services + proxy = HostPort "127.0.0.1" 9087, + stern = g Stern + } + acquireResources :: forall m a. (Ord a, MonadIO m, MonadMask m, HasCallStack) => Int -> ResourcePool a -> Codensity m [a] acquireResources n pool = Codensity $ \f -> bracket acquire release $ \s -> do liftIO $ mapM_ pool.onAcquire s @@ -46,16 +66,15 @@ acquireResources n pool = Codensity $ \f -> bracket acquire release $ \s -> do waitQSemN pool.sem n atomicModifyIORef pool.resources $ swap . Set.splitAt n -createBackendResourcePool :: [DynamicBackendConfig] -> RabbitMQConfig -> ClientState -> IO (ResourcePool BackendResource) -createBackendResourcePool dynConfs rabbitmq cassClient = - let resources = backendResources dynConfs - cleanupBackend :: BackendResource -> IO () +createBackendResourcePool :: [BackendResource] -> RabbitMQConfig -> ClientState -> IO (ResourcePool BackendResource) +createBackendResourcePool resources rabbitmq cassClient = + let cleanupBackend :: BackendResource -> IO () cleanupBackend resource = do deleteAllRabbitMQQueues rabbitmq resource runClient cassClient $ deleteAllDynamicBackendConfigs resource in ResourcePool - <$> newQSemN (length dynConfs) - <*> newIORef resources + <$> newQSemN (length resources) + <*> newIORef (Set.fromList resources) <*> pure cleanupBackend deleteAllRabbitMQQueues :: RabbitMQConfig -> BackendResource -> IO () @@ -78,7 +97,7 @@ deleteAllDynamicBackendConfigs resource = write cql (defQueryParams LocalQuorum cql :: PrepQuery W () () cql = fromString $ "TRUNCATE " <> resource.berBrigKeyspace <> ".federation_remotes" -backendResources :: [DynamicBackendConfig] -> Set.Set BackendResource +backendResources :: [DynamicBackendConfig] -> [BackendResource] backendResources dynConfs = (zip dynConfs [1 ..]) <&> ( \(dynConf, i) -> @@ -107,7 +126,6 @@ backendResources dynConfs = berInternalServicePorts = Ports.internalServicePorts name } ) - & Set.fromList where suffix :: (Show a, Num a) => a -> String suffix i = show $ i + 2