From 1454bfaca41addcc4218dedeb0d56e1a29368605 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 28 Mar 2025 10:27:18 +1100 Subject: [PATCH 01/10] POC: Add support for per project repo client Relates: ES-11383 --- .../s3/PerProjectClientManager.java | 166 ++++++++++++++++++ .../repositories/s3/S3BlobStore.java | 4 +- .../repositories/s3/S3RepositoryPlugin.java | 26 ++- .../repositories/s3/S3Service.java | 40 ++++- 4 files changed, 232 insertions(+), 4 deletions(-) create mode 100644 modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java new file mode 100644 index 0000000000000..6776653101e76 --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.s3; + +import com.amazonaws.http.IdleConnectionReaper; +import com.amazonaws.services.s3.AmazonS3; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.settings.ProjectSecrets; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CachedSupplier; +import org.elasticsearch.core.IOUtils; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class PerProjectClientManager implements ClusterStateListener { + + // Original settings at node startup time + private final Settings settings; + private final Function clientBuilder; + + // A map of per-project clients, where the key is the project ID and the value is a map of client name to client + private final Map> perProjectClientsCache; + + public PerProjectClientManager(Settings settings, Function clientBuilder) { + this.settings = settings; + this.clientBuilder = clientBuilder; + this.perProjectClientsCache = new ConcurrentHashMap<>(); + } + + public void clusterChanged(ClusterChangedEvent event) { + final Map currentProjects = event.state().metadata().projects(); + + final var updatedPerProjectClients = new HashMap>(); + for (var project : currentProjects.values()) { + final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); + if (projectSecrets == null) { + // This can only happen when a node restarts, it will be processed again when file settings are loaded + continue; + } + final Settings currentSettings = Settings.builder() + // merge with static settings such as max retries etc, exclude secure settings + // TODO: We may need to update this if per-project settings decide to support hierarchical overrides + .put(settings, false) + .setSecureSettings(projectSecrets.getSettings()) + .build(); + final Map clientSettings = S3ClientSettings.load(currentSettings); + + // TODO: Building and comparing the whole S3ClientSettings may be insufficient, we could just compare the relevant secrets + if (newOrUpdated(project.id(), clientSettings)) { + updatedPerProjectClients.put( + project.id(), + clientSettings.entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue()))) + ); + } + } + // Updated projects + for (var projectId : updatedPerProjectClients.keySet()) { + final Map old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId)); + if (old != null) { + IOUtils.closeWhileHandlingException(old.values()); + } + } + + // removed projects + for (var projectId : perProjectClientsCache.keySet()) { + if (currentProjects.containsKey(projectId) == false) { + final Map removed = perProjectClientsCache.remove(projectId); + assert removed != null; + IOUtils.closeWhileHandlingException(removed.values()); + } + } + } + + public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { + if (perProjectClientsCache.containsKey(projectId) == false) { + throw new IllegalArgumentException("project [" + projectId + "] does not exist"); + } + final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetadata.settings()); + final Map clientHolders = perProjectClientsCache.get(projectId); + + if (clientHolders.containsKey(clientName) == false) { + throw new IllegalArgumentException("client [" + clientName + "] does not exist"); + } + + return clientHolders.get(clientName).client(); + } + + /** + * Similar to S3Service#releaseCachedClients but only clears the cache for the given project. + * All clients for the project are closed and will be recreated on next access. Also, similar to S3Service#releaseCachedClients + */ + public void clearCacheForProject(ProjectId projectId) { + final Map old = perProjectClientsCache.get(projectId); + assert old != null : projectId; + IOUtils.closeWhileHandlingException(old.values()); + perProjectClientsCache.put( + projectId, + old.entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue().clientSettings()))) + ); + // TODO: do we need this? + // shutdown IdleConnectionReaper background thread + // it will be restarted on new client usage + IdleConnectionReaper.shutdown(); + } + + public void close() { + for (var clientHolders : perProjectClientsCache.values()) { + IOUtils.closeWhileHandlingException(clientHolders.values()); + } + perProjectClientsCache.clear(); + } + + private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { + if (perProjectClientsCache.containsKey(projectId) == false) { + return true; + } + final var previousClientSettings = perProjectClientsCache.get(projectId) + .entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> entry.getValue().clientSettings())); + return currentClientSettings.equals(previousClientSettings) == false; + } + + private final class ClientHolder implements Closeable { + private final S3ClientSettings clientSettings; + private final CachedSupplier client; + + ClientHolder(S3ClientSettings clientSettings) { + this.clientSettings = clientSettings; + this.client = CachedSupplier.wrap(() -> new AmazonS3Reference(clientBuilder.apply(clientSettings))); + } + + public S3ClientSettings clientSettings() { + return clientSettings; + } + + public AmazonS3Reference client() { + return client.get(); + } + + public void close() { + client.get().decRef(); + } + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 129de029daf7a..f388ff45274db 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; @@ -490,7 +491,8 @@ private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobS @Override public void close() throws IOException { - service.onBlobStoreClose(); + // TODO: take actual project-id in account + service.onBlobStoreClose(Metadata.DEFAULT_PROJECT_ID); } @Override diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 37b960b33eb79..34a75be5a4c5e 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -86,15 +87,38 @@ protected S3Repository createRepository( @Override public Collection createComponents(PluginServices services) { - service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService())); + PerProjectClientManager perProjectClientManager = null; + if (services.projectResolver().supportsMultipleProjects()) { + perProjectClientManager = new PerProjectClientManager(settings, this.service.get()::buildClient); + services.clusterService().addListener(perProjectClientManager); + } + service.set( + s3Service( + services.environment(), + services.clusterService().getSettings(), + services.resourceWatcherService(), + perProjectClientManager + ) + ); this.service.get().refreshAndClearCache(S3ClientSettings.load(settings)); + return List.of(service); } + @Deprecated(forRemoval = true) S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { return new S3Service(environment, nodeSettings, resourceWatcherService); } + S3Service s3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + @Nullable PerProjectClientManager perProjectClientManager + ) { + return new S3Service(environment, nodeSettings, resourceWatcherService, perProjectClientManager); + } + @Override public Map getRepositories( final Environment env, diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 0e94275e0d919..2668b62be0bf7 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -41,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.rest.RestStatus; @@ -99,8 +101,19 @@ class S3Service implements Closeable { final TimeValue compareAndExchangeTimeToLive; final TimeValue compareAndExchangeAntiContentionDelay; final boolean isStateless; + private final PerProjectClientManager perProjectClientManager; + @Deprecated(forRemoval = true) S3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { + this(environment, nodeSettings, resourceWatcherService, null); + } + + S3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + @Nullable PerProjectClientManager perProjectClientManager + ) { webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider( environment, System::getenv, @@ -111,6 +124,7 @@ class S3Service implements Closeable { compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings); compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings); isStateless = DiscoveryNode.isStateless(nodeSettings); + this.perProjectClientManager = perProjectClientManager; } /** @@ -153,6 +167,21 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { } } + /** + * Delegates to {@link #client(RepositoryMetadata)} if per-project client is disabled. + * Otherwise, attempts to retrieve a per-project client by the project-id and repository metadata from the + * per-project client manager. + * Throws if project-id or the client does not exist. The client maybe initialized lazily. + */ + public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { + if (perProjectClientManager == null) { + assert ProjectId.DEFAULT.equals(projectId) : projectId; + return client(repositoryMetadata); + } else { + return perProjectClientManager.client(projectId, repositoryMetadata); + } + } + /** * Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them * by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata. @@ -303,13 +332,20 @@ private synchronized void releaseCachedClients() { IdleConnectionReaper.shutdown(); } - public void onBlobStoreClose() { - releaseCachedClients(); + public void onBlobStoreClose(ProjectId projectId) { + if (perProjectClientManager == null) { + releaseCachedClients(); + } else { + perProjectClientManager.clearCacheForProject(projectId); + } } @Override public void close() throws IOException { releaseCachedClients(); + if (perProjectClientManager != null) { + perProjectClientManager.close(); + } webIdentityTokenCredentialsProvider.shutdown(); } From 79b459484a0e823af0e49be59e7b9fb695bc5319 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 10 Apr 2025 18:51:35 +1000 Subject: [PATCH 02/10] fix initialization --- .../elasticsearch/repositories/s3/S3RepositoryPlugin.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 34a75be5a4c5e..c9cbf6466d342 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -89,7 +89,10 @@ protected S3Repository createRepository( public Collection createComponents(PluginServices services) { PerProjectClientManager perProjectClientManager = null; if (services.projectResolver().supportsMultipleProjects()) { - perProjectClientManager = new PerProjectClientManager(settings, this.service.get()::buildClient); + perProjectClientManager = new PerProjectClientManager( + settings, + s3ClientSettings -> this.service.get().buildClient(s3ClientSettings) + ); services.clusterService().addListener(perProjectClientManager); } service.set( From a75550312ceef8bfb56c56c90b4ac10f2679edae Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 10 Apr 2025 19:28:26 +1000 Subject: [PATCH 03/10] more comments --- .../elasticsearch/repositories/s3/S3BlobStore.java | 2 ++ .../elasticsearch/repositories/s3/S3Repository.java | 1 + .../org/elasticsearch/repositories/s3/S3Service.java | 11 ++++++++--- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index f388ff45274db..addddd2ac8796 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -116,6 +116,7 @@ class S3BlobStore implements BlobStore { S3RepositoriesMetrics s3RepositoriesMetrics, BackoffPolicy retryThrottledDeleteBackoffPolicy ) { + // TODO: add a projectId field, maybe null for cluster level blobstore this.service = service; this.bigArrays = bigArrays; this.bucket = bucket; @@ -311,6 +312,7 @@ public String toString() { } public AmazonS3Reference clientReference() { + // TODO: change to service.client(projectId, repositoryMetadata) return service.client(repositoryMetadata); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 0904f37e39743..62a8cde8bdf7d 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -292,6 +292,7 @@ class S3Repository extends MeteredBlobStoreRepository { buildBasePath(metadata), buildLocation(metadata) ); + // TODO: add a projectId field this.service = service; this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 2668b62be0bf7..d945ee112db08 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -168,15 +168,20 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { } /** - * Delegates to {@link #client(RepositoryMetadata)} if per-project client is disabled. + * Delegates to {@link #client(RepositoryMetadata)} when + * 1. per-project client is disabled + * 2. or when the blobstore is cluster level (projectId = null) * Otherwise, attempts to retrieve a per-project client by the project-id and repository metadata from the - * per-project client manager. - * Throws if project-id or the client does not exist. The client maybe initialized lazily. + * per-project client manager. Throws if project-id or the client does not exist. The client maybe initialized lazily. */ public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { if (perProjectClientManager == null) { + // Multi-Project is disabled and we have a single default project assert ProjectId.DEFAULT.equals(projectId) : projectId; return client(repositoryMetadata); + } else if (projectId == null) { + // Multi-Project is enabled and we are retrieving a client for the cluster level blobstore + return client(repositoryMetadata); } else { return perProjectClientManager.client(projectId, repositoryMetadata); } From 2f28ecaca81a22c8b4d5f6b383788fb3e03ddc01 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 10 Apr 2025 19:43:11 +1000 Subject: [PATCH 04/10] more comments --- .../repositories/s3/PerProjectClientManager.java | 3 +++ .../repositories/s3/S3BlobStore.java | 5 ++--- .../repositories/s3/S3RepositoryPlugin.java | 1 - .../elasticsearch/repositories/s3/S3Service.java | 15 ++++++++++++--- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java index 6776653101e76..bf39a6398fe54 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java @@ -62,6 +62,9 @@ public void clusterChanged(ClusterChangedEvent event) { .build(); final Map clientSettings = S3ClientSettings.load(currentSettings); + // TODO: clientSettings should not be empty, i.e. there should be at least one client configured + // Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok. + // TODO: Building and comparing the whole S3ClientSettings may be insufficient, we could just compare the relevant secrets if (newOrUpdated(project.id(), clientSettings)) { updatedPerProjectClients.put( diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index addddd2ac8796..63445b2f4765e 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; @@ -493,8 +492,8 @@ private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobS @Override public void close() throws IOException { - // TODO: take actual project-id in account - service.onBlobStoreClose(Metadata.DEFAULT_PROJECT_ID); + // TODO: change to use service.onBlobStoreClose(projectId) + service.onBlobStoreClose(); } @Override diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index c9cbf6466d342..fe09dd225cebc 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -104,7 +104,6 @@ public Collection createComponents(PluginServices services) { ) ); this.service.get().refreshAndClearCache(S3ClientSettings.load(settings)); - return List.of(service); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index d945ee112db08..72a4a4e8aa4e2 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -174,7 +174,7 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { * Otherwise, attempts to retrieve a per-project client by the project-id and repository metadata from the * per-project client manager. Throws if project-id or the client does not exist. The client maybe initialized lazily. */ - public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { + public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) { if (perProjectClientManager == null) { // Multi-Project is disabled and we have a single default project assert ProjectId.DEFAULT.equals(projectId) : projectId; @@ -337,9 +337,18 @@ private synchronized void releaseCachedClients() { IdleConnectionReaper.shutdown(); } - public void onBlobStoreClose(ProjectId projectId) { + public void onBlobStoreClose() { + releaseCachedClients(); + } + + public void onBlobStoreClose(@Nullable ProjectId projectId) { if (perProjectClientManager == null) { - releaseCachedClients(); + // Multi-Project is disabled and we have a single default project + assert ProjectId.DEFAULT.equals(projectId) : projectId; + onBlobStoreClose(); + } else if (projectId == null) { + // Multi-Project is enabled and this is for the cluster level blobstore + onBlobStoreClose(); } else { perProjectClientManager.clearCacheForProject(projectId); } From df7120bdf654a1cc1289ae6da7702a651097d957 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 10 Apr 2025 20:32:12 +1000 Subject: [PATCH 05/10] fix --- .../repositories/s3/RepositoryCredentialsTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index eb4cd955c81b1..f3a879962e3de 100644 --- a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -252,7 +252,12 @@ public ProxyS3RepositoryPlugin(Settings settings) { } @Override - S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { + S3Service s3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + PerProjectClientManager perProjectClientManager + ) { return new ProxyS3Service(environment, nodeSettings, resourceWatcherService); } From b3195ebae10503463c79187fd87eaef0c7c378e5 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 11 Apr 2025 11:39:54 +1000 Subject: [PATCH 06/10] per-project clients for GCS --- ...eCloudStorageBlobStoreRepositoryTests.java | 9 +- .../gcs/GcsPerProjectClientManager.java | 134 ++++++++++++++++++ .../gcs/GoogleCloudStoragePlugin.java | 39 +++-- .../gcs/GoogleCloudStorageService.java | 43 +++++- .../gcs/GoogleCloudStorageServiceTests.java | 8 +- 5 files changed, 217 insertions(+), 16 deletions(-) create mode 100644 modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 01466467a2bcd..8661c76fbf026 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -235,7 +235,10 @@ public TestGoogleCloudStoragePlugin(Settings settings) { } @Override - protected GoogleCloudStorageService createStorageService(boolean isServerless) { + protected GoogleCloudStorageService createStorageService( + boolean isServerless, + GcsPerProjectClientManager gcsPerProjectClientManager + ) { return new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( @@ -279,7 +282,7 @@ public Map getRepositories( metadata -> new GoogleCloudStorageRepository( metadata, registry, - this.storageService, + this.storageService.get(), clusterService, bigArrays, recoverySettings, @@ -291,7 +294,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() { metadata.settings().get("bucket"), "test", metadata.name(), - storageService, + storageService.get(), bigArrays, randomIntBetween(1, 8) * 1024, BackoffPolicy.noBackoff(), diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java new file mode 100644 index 0000000000000..bc378ac2d08b8 --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.settings.ProjectSecrets; +import org.elasticsearch.common.settings.Settings; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; + +public class GcsPerProjectClientManager implements ClusterStateListener { + + private final Settings settings; + private final BiFunction clientBuilder; + private final Map perProjectClientsCache; + + public GcsPerProjectClientManager( + Settings settings, + BiFunction clientBuilder + ) { + this.settings = settings; + this.clientBuilder = clientBuilder; + this.perProjectClientsCache = new ConcurrentHashMap<>(); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final Map currentProjects = event.state().metadata().projects(); + + final var updatedPerProjectClients = new HashMap(); + for (var project : currentProjects.values()) { + final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); + if (projectSecrets == null) { + // This can only happen when a node restarts, it will be processed again when file settings are loaded + continue; + } + final Settings currentSettings = Settings.builder() + // merge with static settings such as max retries etc, exclude secure settings + // TODO: We may need to update this if per-project settings decide to support hierarchical overrides + .put(settings, false) + .setSecureSettings(projectSecrets.getSettings()) + .build(); + final Map clientSettings = GoogleCloudStorageClientSettings.load(currentSettings); + + // TODO: clientSettings should not be empty, i.e. there should be at least one client configured + // Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok. + + // TODO: Building and comparing the whole GoogleCloudStorageClientSettings may be insufficient, we could just compare the + // relevant secrets + if (newOrUpdated(project.id(), clientSettings)) { + updatedPerProjectClients.put(project.id(), new ClientHolder(clientSettings)); + } + } + // Updated projects + perProjectClientsCache.putAll(updatedPerProjectClients); + + // removed projects + for (var projectId : perProjectClientsCache.keySet()) { + if (currentProjects.containsKey(projectId) == false) { + perProjectClientsCache.remove(projectId); + } + } + } + + public MeteredStorage client( + ProjectId projectId, + String clientName, + String repositoryName, + GcsRepositoryStatsCollector statsCollector + ) { + final var clientHolder = perProjectClientsCache.get(projectId); + if (clientHolder == null) { + throw new IllegalArgumentException("No project found for [" + projectId + "]"); + } + return clientHolder.client(clientName, repositoryName, statsCollector); + } + + public void closeRepositoryClients(ProjectId projectId, String repositoryName) { + final var clientHolder = perProjectClientsCache.get(projectId); + if (clientHolder != null) { + clientHolder.closeRepositoryClients(repositoryName); + } + } + + private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { + if (perProjectClientsCache.containsKey(projectId) == false) { + return true; + } + final var previousClientSettings = perProjectClientsCache.get(projectId).getClientSettings(); + return currentClientSettings.equals(previousClientSettings) == false; + } + + private final class ClientHolder { + // clientName -> client settings + private final Map clientSettings; + // repositoryName -> client + private final Map clientCache = new ConcurrentHashMap<>(); + + ClientHolder(Map clientSettings) { + this.clientSettings = clientSettings; + } + + public Map getClientSettings() { + return clientSettings; + } + + MeteredStorage client(String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector) { + return clientCache.computeIfAbsent(repositoryName, ignored -> { + final var settings = clientSettings.get(clientName); + if (settings == null) { + throw new IllegalArgumentException("No client settings found for [" + clientName + "]"); + } + return clientBuilder.apply(settings, statsCollector); + }); + } + + void closeRepositoryClients(String repositoryName) { + clientCache.remove(repositoryName); + } + } +} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 3f7d3ae4825ff..1c67c9dcabca9 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -9,11 +9,13 @@ package org.elasticsearch.repositories.gcs; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -23,7 +25,10 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.xcontent.NamedXContentRegistry; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -31,19 +36,37 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { // package-private for tests - final GoogleCloudStorageService storageService; + final SetOnce storageService = new SetOnce<>(); - @SuppressWarnings("this-escape") - public GoogleCloudStoragePlugin(final Settings settings) { + public GoogleCloudStoragePlugin(final Settings settings) {} + + @Override + public Collection createComponents(PluginServices services) { + final Settings settings = services.clusterService().getSettings(); + GcsPerProjectClientManager gcsPerProjectClientManager = null; + if (services.projectResolver().supportsMultipleProjects()) { + gcsPerProjectClientManager = new GcsPerProjectClientManager(settings, (gcsClientSettings, statsCollector) -> { + try { + return storageService.get().createClient(gcsClientSettings, statsCollector); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + services.clusterService().addListener(gcsPerProjectClientManager); + } var isServerless = DiscoveryNode.isStateless(settings); - this.storageService = createStorageService(isServerless); + this.storageService.set(createStorageService(isServerless, gcsPerProjectClientManager)); // eagerly load client settings so that secure settings are readable (not closed) reload(settings); + return List.of(); } // overridable for tests - protected GoogleCloudStorageService createStorageService(boolean isServerless) { - return new GoogleCloudStorageService(isServerless); + protected GoogleCloudStorageService createStorageService( + boolean isServerless, + @Nullable GcsPerProjectClientManager gcsPerProjectClientManager + ) { + return new GoogleCloudStorageService(isServerless, gcsPerProjectClientManager); } @Override @@ -60,7 +83,7 @@ public Map getRepositories( metadata -> new GoogleCloudStorageRepository( metadata, namedXContentRegistry, - this.storageService, + this.storageService.get(), clusterService, bigArrays, recoverySettings, @@ -93,6 +116,6 @@ public void reload(Settings settings) { // `GoogleCloudStorageClientSettings` instance) instead of the `Settings` // instance. final Map clientsSettings = GoogleCloudStorageClientSettings.load(settings); - this.storageService.refreshAndClearCache(clientsSettings); + this.storageService.get().refreshAndClearCache(clientsSettings); } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index bb83b767abb4c..19c7a733269e3 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -56,13 +57,16 @@ public class GoogleCloudStorageService { private volatile Map clientSettings = emptyMap(); private final boolean isServerless; + @Nullable + private GcsPerProjectClientManager gcsPerProjectClientManager; public GoogleCloudStorageService() { - this.isServerless = false; + this(false, null); } - public GoogleCloudStorageService(boolean isServerless) { + public GoogleCloudStorageService(boolean isServerless, @Nullable GcsPerProjectClientManager gcsPerProjectClientManager) { this.isServerless = isServerless; + this.gcsPerProjectClientManager = gcsPerProjectClientManager; } public boolean isServerless() { @@ -132,6 +136,25 @@ public MeteredStorage client(final String clientName, final String repositoryNam } } + public MeteredStorage client( + @Nullable final ProjectId projectId, + final String clientName, + final String repositoryName, + final GcsRepositoryStatsCollector statsCollector + ) throws IOException { + if (gcsPerProjectClientManager == null) { + // single default project mode + assert ProjectId.DEFAULT.equals(projectId) : projectId; + return client(clientName, repositoryName, statsCollector); + } else if (projectId == null) { + // MP mode for cluster level client + return client(clientName, repositoryName, statsCollector); + } else { + // MP mode for per-project client + return gcsPerProjectClientManager.client(projectId, clientName, repositoryName, statsCollector); + } + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() @@ -139,6 +162,20 @@ synchronized void closeRepositoryClients(String repositoryName) { .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } + void closeRepositoryClients(@Nullable ProjectId projectId, String repositoryName) { + if (gcsPerProjectClientManager == null) { + // single default project mode + assert ProjectId.DEFAULT.equals(projectId) : projectId; + closeRepositoryClients(repositoryName); + } else if (projectId == null) { + // MP mode for cluster level client + closeRepositoryClients(repositoryName); + } else { + // MP mode for per-project client + gcsPerProjectClientManager.closeRepositoryClients(projectId, repositoryName); + } + } + /** * Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe. * @@ -146,7 +183,7 @@ synchronized void closeRepositoryClients(String repositoryName) { * @return a new client storage instance that can be used to manage objects * (blobs) */ - private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) + MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) throws IOException { final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index b888a9e97f76f..ba9d669ab5043 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -113,7 +113,9 @@ public void testReinitClientSettings() throws Exception { secureSettings2.setFile("gcs.client.gcs3.credentials_file", serviceAccountFileContent("project_gcs23")); final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { - final GoogleCloudStorageService storageService = plugin.storageService; + plugin.storageService.set(plugin.createStorageService(randomBoolean(), null)); + plugin.reload(settings1); + final GoogleCloudStorageService storageService = plugin.storageService.get(); var statsCollector = new GcsRepositoryStatsCollector(); final var client11 = storageService.client("gcs1", "repo1", statsCollector); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); @@ -151,7 +153,9 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { secureSettings1.setFile("gcs.client.gcs1.credentials_file", serviceAccountFileContent("test_project")); final Settings settings = Settings.builder().setSecureSettings(secureSettings1).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) { - final GoogleCloudStorageService storageService = plugin.storageService; + plugin.storageService.set(plugin.createStorageService(randomBoolean(), null)); + plugin.reload(settings); + final GoogleCloudStorageService storageService = plugin.storageService.get(); final MeteredStorage repo1Client = storageService.client("gcs1", "repo1", new GcsRepositoryStatsCollector()); final MeteredStorage repo2Client = storageService.client("gcs1", "repo2", new GcsRepositoryStatsCollector()); From 5d94db04ebe029433953caa5114a7c39fca450f9 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 11 Apr 2025 13:39:45 +1000 Subject: [PATCH 07/10] s3 lifecycle --- .../gcs/GcsPerProjectClientManager.java | 4 +- .../s3/RepositoryCredentialsTests.java | 2 +- ...er.java => S3PerProjectClientManager.java} | 133 +++++++++++------- .../repositories/s3/S3RepositoryPlugin.java | 12 +- .../repositories/s3/S3Service.java | 18 +-- 5 files changed, 97 insertions(+), 72 deletions(-) rename modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/{PerProjectClientManager.java => S3PerProjectClientManager.java} (50%) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java index bc378ac2d08b8..9125ec0c3d1e5 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java @@ -99,7 +99,7 @@ private boolean newOrUpdated(ProjectId projectId, Map getClientSettings() { + public Map clientSettings() { return clientSettings; } diff --git a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index f3a879962e3de..b85d9abb6422a 100644 --- a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -256,7 +256,7 @@ S3Service s3Service( Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService, - PerProjectClientManager perProjectClientManager + S3PerProjectClientManager s3PerProjectClientManager ) { return new ProxyS3Service(environment, nodeSettings, resourceWatcherService); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java similarity index 50% rename from modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java rename to modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java index bf39a6398fe54..3da6be5817d7a 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/PerProjectClientManager.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java @@ -19,26 +19,27 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.settings.ProjectSecrets; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CachedSupplier; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.IOUtils; import java.io.Closeable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import java.util.stream.Collectors; -public class PerProjectClientManager implements ClusterStateListener { +public class S3PerProjectClientManager implements ClusterStateListener { // Original settings at node startup time private final Settings settings; private final Function clientBuilder; - // A map of per-project clients, where the key is the project ID and the value is a map of client name to client - private final Map> perProjectClientsCache; + // A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread. + private final Map perProjectClientsCache; - public PerProjectClientManager(Settings settings, Function clientBuilder) { + public S3PerProjectClientManager(Settings settings, Function clientBuilder) { this.settings = settings; this.clientBuilder = clientBuilder; this.perProjectClientsCache = new ConcurrentHashMap<>(); @@ -47,7 +48,7 @@ public PerProjectClientManager(Settings settings, Function currentProjects = event.state().metadata().projects(); - final var updatedPerProjectClients = new HashMap>(); + final var updatedPerProjectClients = new HashMap(); for (var project : currentProjects.values()) { final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); if (projectSecrets == null) { @@ -67,103 +68,127 @@ public void clusterChanged(ClusterChangedEvent event) { // TODO: Building and comparing the whole S3ClientSettings may be insufficient, we could just compare the relevant secrets if (newOrUpdated(project.id(), clientSettings)) { - updatedPerProjectClients.put( - project.id(), - clientSettings.entrySet() - .stream() - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue()))) - ); + updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings)); } } + // Updated projects for (var projectId : updatedPerProjectClients.keySet()) { - final Map old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId)); + final var old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId)); if (old != null) { - IOUtils.closeWhileHandlingException(old.values()); + old.close(); } } // removed projects for (var projectId : perProjectClientsCache.keySet()) { if (currentProjects.containsKey(projectId) == false) { - final Map removed = perProjectClientsCache.remove(projectId); + final var removed = perProjectClientsCache.remove(projectId); assert removed != null; - IOUtils.closeWhileHandlingException(removed.values()); + removed.close(); } } } public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { - if (perProjectClientsCache.containsKey(projectId) == false) { + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder == null) { throw new IllegalArgumentException("project [" + projectId + "] does not exist"); } final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetadata.settings()); - final Map clientHolders = perProjectClientsCache.get(projectId); - - if (clientHolders.containsKey(clientName) == false) { - throw new IllegalArgumentException("client [" + clientName + "] does not exist"); - } - - return clientHolders.get(clientName).client(); + return clientsHolder.client(clientName); } /** * Similar to S3Service#releaseCachedClients but only clears the cache for the given project. - * All clients for the project are closed and will be recreated on next access. Also, similar to S3Service#releaseCachedClients + * All clients for the project are closed and will be recreated on next access, also similar to S3Service#releaseCachedClients */ public void clearCacheForProject(ProjectId projectId) { - final Map old = perProjectClientsCache.get(projectId); - assert old != null : projectId; - IOUtils.closeWhileHandlingException(old.values()); - perProjectClientsCache.put( - projectId, - old.entrySet() - .stream() - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue().clientSettings()))) - ); + final var old = perProjectClientsCache.get(projectId); + if (old != null) { + old.clearCache(); + } // TODO: do we need this? // shutdown IdleConnectionReaper background thread // it will be restarted on new client usage IdleConnectionReaper.shutdown(); } + /** + * Shutdown the manager by closing all clients holders. This is called when the node is shutting down. + */ public void close() { - for (var clientHolders : perProjectClientsCache.values()) { - IOUtils.closeWhileHandlingException(clientHolders.values()); - } - perProjectClientsCache.clear(); + IOUtils.closeWhileHandlingException(perProjectClientsCache.values()); } private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { - if (perProjectClientsCache.containsKey(projectId) == false) { + final var old = perProjectClientsCache.get(projectId); + if (old == null) { return true; } - final var previousClientSettings = perProjectClientsCache.get(projectId) - .entrySet() - .stream() - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> entry.getValue().clientSettings())); - return currentClientSettings.equals(previousClientSettings) == false; + final var oldClientSettings = old.clientSettings(); + return currentClientSettings.equals(oldClientSettings) == false; } - private final class ClientHolder implements Closeable { - private final S3ClientSettings clientSettings; - private final CachedSupplier client; + private final class ClientsHolder implements Closeable { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Map clientSettings; + private volatile Map clientsCache = Collections.emptyMap(); - ClientHolder(S3ClientSettings clientSettings) { + ClientsHolder(Map clientSettings) { this.clientSettings = clientSettings; - this.client = CachedSupplier.wrap(() -> new AmazonS3Reference(clientBuilder.apply(clientSettings))); } - public S3ClientSettings clientSettings() { + public Map clientSettings() { return clientSettings; } - public AmazonS3Reference client() { - return client.get(); + public AmazonS3Reference client(String clientName) { + final var clientReference = clientsCache.get(clientName); + // It is ok to retrieve an existing client when the cache is being cleared or the holder is closing. + // As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released + // by either the caller of this method or the clearCache() method. + if (clientReference != null && clientReference.tryIncRef()) { + return clientReference; + } + final var settings = clientSettings.get(clientName); + if (settings == null) { + throw new IllegalArgumentException("client [" + clientName + "] does not exist"); + } + synchronized (this) { + final var existing = clientsCache.get(clientName); + if (existing != null && existing.tryIncRef()) { + return existing; + } + if (closed.get()) { + // Not adding new client once the manager is closed since there won't be anything to close it + throw new IllegalStateException("client manager is closed"); + } + // The close() method maybe called after we checked it, it is ok since we are already inside the synchronized block. + // The clearCache() will clear the newly added client. + final var newClientReference = new AmazonS3Reference(clientBuilder.apply(settings)); + newClientReference.mustIncRef(); + clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientName, newClientReference); + return newClientReference; + } } + /** + * Clear the cache by closing and clear out all clients. New {@link #client(String)} call will recreate + * the clients and populate the cache again. + */ + public synchronized void clearCache() { + IOUtils.closeWhileHandlingException(clientsCache.values()); + clientsCache = Collections.emptyMap(); + } + + /** + * Similar to {@link #clearCache()} but also flag the holder to be closed so that no new client can be created. + */ public void close() { - client.get().decRef(); + if (closed.compareAndSet(false, true)) { + clearCache(); + } } } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index fe09dd225cebc..805925c288573 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -87,20 +87,20 @@ protected S3Repository createRepository( @Override public Collection createComponents(PluginServices services) { - PerProjectClientManager perProjectClientManager = null; + S3PerProjectClientManager s3PerProjectClientManager = null; if (services.projectResolver().supportsMultipleProjects()) { - perProjectClientManager = new PerProjectClientManager( + s3PerProjectClientManager = new S3PerProjectClientManager( settings, s3ClientSettings -> this.service.get().buildClient(s3ClientSettings) ); - services.clusterService().addListener(perProjectClientManager); + services.clusterService().addListener(s3PerProjectClientManager); } service.set( s3Service( services.environment(), services.clusterService().getSettings(), services.resourceWatcherService(), - perProjectClientManager + s3PerProjectClientManager ) ); this.service.get().refreshAndClearCache(S3ClientSettings.load(settings)); @@ -116,9 +116,9 @@ S3Service s3Service( Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService, - @Nullable PerProjectClientManager perProjectClientManager + @Nullable S3PerProjectClientManager s3PerProjectClientManager ) { - return new S3Service(environment, nodeSettings, resourceWatcherService, perProjectClientManager); + return new S3Service(environment, nodeSettings, resourceWatcherService, s3PerProjectClientManager); } @Override diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 72a4a4e8aa4e2..52c2a9b3b8229 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -101,7 +101,7 @@ class S3Service implements Closeable { final TimeValue compareAndExchangeTimeToLive; final TimeValue compareAndExchangeAntiContentionDelay; final boolean isStateless; - private final PerProjectClientManager perProjectClientManager; + private final S3PerProjectClientManager s3PerProjectClientManager; @Deprecated(forRemoval = true) S3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { @@ -112,7 +112,7 @@ class S3Service implements Closeable { Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService, - @Nullable PerProjectClientManager perProjectClientManager + @Nullable S3PerProjectClientManager s3PerProjectClientManager ) { webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider( environment, @@ -124,7 +124,7 @@ class S3Service implements Closeable { compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings); compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings); isStateless = DiscoveryNode.isStateless(nodeSettings); - this.perProjectClientManager = perProjectClientManager; + this.s3PerProjectClientManager = s3PerProjectClientManager; } /** @@ -175,7 +175,7 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { * per-project client manager. Throws if project-id or the client does not exist. The client maybe initialized lazily. */ public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) { - if (perProjectClientManager == null) { + if (s3PerProjectClientManager == null) { // Multi-Project is disabled and we have a single default project assert ProjectId.DEFAULT.equals(projectId) : projectId; return client(repositoryMetadata); @@ -183,7 +183,7 @@ public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadat // Multi-Project is enabled and we are retrieving a client for the cluster level blobstore return client(repositoryMetadata); } else { - return perProjectClientManager.client(projectId, repositoryMetadata); + return s3PerProjectClientManager.client(projectId, repositoryMetadata); } } @@ -342,7 +342,7 @@ public void onBlobStoreClose() { } public void onBlobStoreClose(@Nullable ProjectId projectId) { - if (perProjectClientManager == null) { + if (s3PerProjectClientManager == null) { // Multi-Project is disabled and we have a single default project assert ProjectId.DEFAULT.equals(projectId) : projectId; onBlobStoreClose(); @@ -350,15 +350,15 @@ public void onBlobStoreClose(@Nullable ProjectId projectId) { // Multi-Project is enabled and this is for the cluster level blobstore onBlobStoreClose(); } else { - perProjectClientManager.clearCacheForProject(projectId); + s3PerProjectClientManager.clearCacheForProject(projectId); } } @Override public void close() throws IOException { releaseCachedClients(); - if (perProjectClientManager != null) { - perProjectClientManager.close(); + if (s3PerProjectClientManager != null) { + s3PerProjectClientManager.close(); } webIdentityTokenCredentialsProvider.shutdown(); } From 35cd05b52b076ebb93d67bf6737c7c67c373a9b7 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 11 Apr 2025 14:17:15 +1000 Subject: [PATCH 08/10] tweak --- .../gcs/GcsPerProjectClientManager.java | 31 ++++++++++--------- .../s3/S3PerProjectClientManager.java | 21 ++++++------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java index 9125ec0c3d1e5..32dda1088d94c 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java @@ -25,7 +25,7 @@ public class GcsPerProjectClientManager implements ClusterStateListener { private final Settings settings; private final BiFunction clientBuilder; - private final Map perProjectClientsCache; + private final Map perProjectClientsCache; public GcsPerProjectClientManager( Settings settings, @@ -40,7 +40,7 @@ public GcsPerProjectClientManager( public void clusterChanged(ClusterChangedEvent event) { final Map currentProjects = event.state().metadata().projects(); - final var updatedPerProjectClients = new HashMap(); + final var updatedPerProjectClients = new HashMap(); for (var project : currentProjects.values()) { final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); if (projectSecrets == null) { @@ -61,9 +61,10 @@ public void clusterChanged(ClusterChangedEvent event) { // TODO: Building and comparing the whole GoogleCloudStorageClientSettings may be insufficient, we could just compare the // relevant secrets if (newOrUpdated(project.id(), clientSettings)) { - updatedPerProjectClients.put(project.id(), new ClientHolder(clientSettings)); + updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings)); } } + // Updated projects perProjectClientsCache.putAll(updatedPerProjectClients); @@ -81,39 +82,39 @@ public MeteredStorage client( String repositoryName, GcsRepositoryStatsCollector statsCollector ) { - final var clientHolder = perProjectClientsCache.get(projectId); - if (clientHolder == null) { + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder == null) { throw new IllegalArgumentException("No project found for [" + projectId + "]"); } - return clientHolder.client(clientName, repositoryName, statsCollector); + return clientsHolder.client(clientName, repositoryName, statsCollector); } public void closeRepositoryClients(ProjectId projectId, String repositoryName) { - final var clientHolder = perProjectClientsCache.get(projectId); - if (clientHolder != null) { - clientHolder.closeRepositoryClients(repositoryName); + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder != null) { + clientsHolder.closeRepositoryClients(repositoryName); } } private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { - if (perProjectClientsCache.containsKey(projectId) == false) { + final ClientsHolder old = perProjectClientsCache.get(projectId); + if (old == null) { return true; } - final var previousClientSettings = perProjectClientsCache.get(projectId).clientSettings(); - return currentClientSettings.equals(previousClientSettings) == false; + return currentClientSettings.equals(old.clientSettings()) == false; } - private final class ClientHolder { + private final class ClientsHolder { // clientName -> client settings private final Map clientSettings; // repositoryName -> client private final Map clientCache = new ConcurrentHashMap<>(); - ClientHolder(Map clientSettings) { + ClientsHolder(Map clientSettings) { this.clientSettings = clientSettings; } - public Map clientSettings() { + Map clientSettings() { return clientSettings; } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java index 3da6be5817d7a..86a98ae9fc048 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java @@ -32,10 +32,8 @@ public class S3PerProjectClientManager implements ClusterStateListener { - // Original settings at node startup time private final Settings settings; private final Function clientBuilder; - // A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread. private final Map perProjectClientsCache; @@ -107,11 +105,11 @@ public void clearCacheForProject(ProjectId projectId) { final var old = perProjectClientsCache.get(projectId); if (old != null) { old.clearCache(); + // TODO: do we need this? + // shutdown IdleConnectionReaper background thread + // it will be restarted on new client usage + IdleConnectionReaper.shutdown(); } - // TODO: do we need this? - // shutdown IdleConnectionReaper background thread - // it will be restarted on new client usage - IdleConnectionReaper.shutdown(); } /** @@ -126,8 +124,7 @@ private boolean newOrUpdated(ProjectId projectId, Map if (old == null) { return true; } - final var oldClientSettings = old.clientSettings(); - return currentClientSettings.equals(oldClientSettings) == false; + return currentClientSettings.equals(old.clientSettings()) == false; } private final class ClientsHolder implements Closeable { @@ -139,11 +136,11 @@ private final class ClientsHolder implements Closeable { this.clientSettings = clientSettings; } - public Map clientSettings() { + Map clientSettings() { return clientSettings; } - public AmazonS3Reference client(String clientName) { + AmazonS3Reference client(String clientName) { final var clientReference = clientsCache.get(clientName); // It is ok to retrieve an existing client when the cache is being cleared or the holder is closing. // As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released @@ -174,10 +171,10 @@ public AmazonS3Reference client(String clientName) { } /** - * Clear the cache by closing and clear out all clients. New {@link #client(String)} call will recreate + * Clear the cache by closing and clear out all clients. Subsequent {@link #client(String)} calls will recreate * the clients and populate the cache again. */ - public synchronized void clearCache() { + synchronized void clearCache() { IOUtils.closeWhileHandlingException(clientsCache.values()); clientsCache = Collections.emptyMap(); } From b5e8fb19e57b70b2ad538f4fae6157dfa6b67fee Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 11 Apr 2025 16:37:26 +1000 Subject: [PATCH 09/10] fork client closing --- .../s3/S3PerProjectClientManager.java | 16 ++++++++++++---- .../repositories/s3/S3RepositoryPlugin.java | 4 +++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java index 86a98ae9fc048..8834ff0037ed4 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java @@ -23,10 +23,13 @@ import org.elasticsearch.core.IOUtils; import java.io.Closeable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -34,12 +37,14 @@ public class S3PerProjectClientManager implements ClusterStateListener { private final Settings settings; private final Function clientBuilder; + private final Executor executor; // A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread. private final Map perProjectClientsCache; - public S3PerProjectClientManager(Settings settings, Function clientBuilder) { + public S3PerProjectClientManager(Settings settings, Function clientBuilder, Executor executor) { this.settings = settings; this.clientBuilder = clientBuilder; + this.executor = executor; this.perProjectClientsCache = new ConcurrentHashMap<>(); } @@ -70,22 +75,25 @@ public void clusterChanged(ClusterChangedEvent event) { } } + final List clientsHoldersToClose = new ArrayList<>(); // Updated projects for (var projectId : updatedPerProjectClients.keySet()) { final var old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId)); if (old != null) { - old.close(); + clientsHoldersToClose.add(old); } } - // removed projects for (var projectId : perProjectClientsCache.keySet()) { if (currentProjects.containsKey(projectId) == false) { final var removed = perProjectClientsCache.remove(projectId); assert removed != null; - removed.close(); + clientsHoldersToClose.add(removed); } } + if (clientsHoldersToClose.isEmpty() == false) { + executor.execute(() -> IOUtils.closeWhileHandlingException(clientsHoldersToClose)); + } } public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 805925c288573..7746838701063 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -27,6 +27,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -91,7 +92,8 @@ public Collection createComponents(PluginServices services) { if (services.projectResolver().supportsMultipleProjects()) { s3PerProjectClientManager = new S3PerProjectClientManager( settings, - s3ClientSettings -> this.service.get().buildClient(s3ClientSettings) + s3ClientSettings -> this.service.get().buildClient(s3ClientSettings), + services.threadPool().executor(ThreadPool.Names.GENERIC) ); services.clusterService().addListener(s3PerProjectClientManager); } From ffbe479f2680e3d1f4f21100755758d67ecd1702 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 11 Apr 2025 20:48:24 +1000 Subject: [PATCH 10/10] block waiting for async client closing on shutdown --- .../s3/S3PerProjectClientManager.java | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java index 8834ff0037ed4..df461cffc3c88 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java @@ -12,6 +12,8 @@ import com.amazonaws.http.IdleConnectionReaper; import com.amazonaws.services.s3.AmazonS3; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.ProjectId; @@ -29,7 +31,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -38,8 +42,10 @@ public class S3PerProjectClientManager implements ClusterStateListener { private final Settings settings; private final Function clientBuilder; private final Executor executor; - // A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread. + // A map of projectId to clients holder. Adding to and removing from the map happen only in the cluster state listener thread. private final Map perProjectClientsCache; + // Listener for tracking ongoing async closing of obsolete clients. Updated only in the cluster state listener thread. + private volatile SubscribableListener clientsCloseListener = null; public S3PerProjectClientManager(Settings settings, Function clientBuilder, Executor executor) { this.settings = settings; @@ -92,10 +98,26 @@ public void clusterChanged(ClusterChangedEvent event) { } } if (clientsHoldersToClose.isEmpty() == false) { - executor.execute(() -> IOUtils.closeWhileHandlingException(clientsHoldersToClose)); + final var currentClientsCloseListener = new SubscribableListener(); + final var previousClientsCloseListener = clientsCloseListener; + clientsCloseListener = currentClientsCloseListener; + if (previousClientsCloseListener != null && previousClientsCloseListener.isDone() == false) { + previousClientsCloseListener.addListener( + ActionListener.running(() -> closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener)) + ); + } else { + closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener); + } } } + private void closeClientsAsync(List clientsHoldersToClose, ActionListener listener) { + executor.execute(() -> { + IOUtils.closeWhileHandlingException(clientsHoldersToClose); + listener.onResponse(null); + }); + } + public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { final var clientsHolder = perProjectClientsCache.get(projectId); if (clientsHolder == null) { @@ -125,6 +147,19 @@ public void clearCacheForProject(ProjectId projectId) { */ public void close() { IOUtils.closeWhileHandlingException(perProjectClientsCache.values()); + final var currentClientsCloseListener = clientsCloseListener; + if (currentClientsCloseListener != null && currentClientsCloseListener.isDone() == false) { + // Wait for async clients closing to be completed + final CountDownLatch latch = new CountDownLatch(1); + currentClientsCloseListener.addListener(ActionListener.running(latch::countDown)); + try { + if (latch.await(1, TimeUnit.MINUTES) == false) { + // TODO: log warning + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) {