From bd1f43c22ab214e95f5dd0c3de5dec2f27b2b6c8 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Tue, 20 May 2025 20:40:58 +0800 Subject: [PATCH 1/2] Making snapshot store/restore rate dynamic Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../repositories/hdfs/HdfsTests.java | 6 +- .../repositories/RepositoriesServiceIT.java | 133 +++++++++++++++++- .../snapshots/ConcurrentSnapshotsIT.java | 9 +- .../repositories/RepositoriesService.java | 49 +++++-- .../opensearch/repositories/Repository.java | 4 + .../blobstore/BlobStoreRepository.java | 125 +++++++++++++--- 7 files changed, 286 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b362d4d2cf35..56f8d541c3317 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269))) +- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069) ### Dependencies - Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961)) diff --git a/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsTests.java index 297a3052cc6d9..aabeb34b80ae5 100644 --- a/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsTests.java @@ -160,7 +160,7 @@ public void testSimpleWorkflow() { public void testMissingUri() { try { - OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", Settings.builder()); + OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo1", "hdfs", Settings.builder()); fail(); } catch (RepositoryException e) { assertTrue(e.getCause() instanceof IllegalArgumentException); @@ -193,7 +193,7 @@ public void testNonHdfsUri() { public void testPathSpecifiedInHdfs() { try { Settings.Builder settings = Settings.builder().put("uri", "hdfs:///some/path"); - OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings); + OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo2", "hdfs", settings); fail(); } catch (RepositoryException e) { assertTrue(e.getCause() instanceof IllegalArgumentException); @@ -204,7 +204,7 @@ public void testPathSpecifiedInHdfs() { public void testMissingPath() { try { Settings.Builder settings = Settings.builder().put("uri", "hdfs:///"); - OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings); + OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo3", "hdfs", settings); fail(); } catch (RepositoryException e) { assertTrue(e.getCause() instanceof IllegalArgumentException); diff --git a/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java index b5d5bddd160ca..5f86fd791821e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java @@ -32,6 +32,7 @@ package org.opensearch.repositories; +import org.apache.lucene.store.RateLimiter; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.Settings; @@ -42,12 +43,18 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.transport.client.Client; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_UPLOAD_BYTES_PER_SEC; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted // create repository final String repositoryName = "test-repo"; + Path path = randomRepoPath(); Settings.Builder repoSettings = Settings.builder() - .put("location", randomRepoPath()) - .put("max_snapshot_bytes_per_sec", "10mb") - .put("max_restore_bytes_per_sec", "10mb"); + .put("location", path) + .put(MAX_SNAPSHOT_BYTES_PER_SEC, "10mb") + .put(MAX_RESTORE_BYTES_PER_SEC, "10mb"); OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( client().admin().cluster(), repositoryName, @@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted try { logger.info("--> begin to reset repository"); - repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb"); + repoSettings = Settings.builder().put("location", randomRepoPath()).put(MAX_SNAPSHOT_BYTES_PER_SEC, "300mb"); OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( client().admin().cluster(), repositoryName, @@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted thread.join(); } + + public void testAdjustBytesPerSecSettingForSnapAndRestore() { + final InternalTestCluster cluster = internalCluster(); + final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class) + .iterator() + .next(); + + // create repository + final String repositoryName = "test-repo1"; + long rateBytes = 200000; + Path path = randomRepoPath(); + Settings.Builder repoSettings = Settings.builder() + .put("location", path) + .put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b")); + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( + client().admin().cluster(), + repositoryName, + FsRepository.TYPE, + true, + repoSettings + ); + + FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName); + RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter(); + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + RateLimiter restoreRateLimiter = repository.restoreRateLimiter(); + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + RateLimiter remoteUploadRateLimiter = repository.remoteUploadRateLimiter(); + assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + RateLimiter remoteUploadLowPriorityRateLimiter = repository.remoteUploadLowPriorityRateLimiter(); + assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + RateLimiter remoteDownloadRateLimiter = repository.remoteDownloadRateLimiter(); + assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + + // adjust all the reloadable settings + { + rateBytes = rateBytes / 2; + repoSettings = Settings.builder() + .put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b")); + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( + client().admin().cluster(), + repositoryName, + FsRepository.TYPE, + true, + repoSettings + ); + FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName); + assertThat(newRepository, sameInstance(repository)); + snapshotRateLimiter = newRepository.snapshotRateLimiter(); + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + restoreRateLimiter = newRepository.restoreRateLimiter(); + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter(); + assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter(); + assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter(); + assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + } + + // In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings + { + long newRateBytes = rateBytes / 2; + repoSettings = Settings.builder() + .put("location", path) + .put(MAX_SNAPSHOT_BYTES_PER_SEC, (newRateBytes + "b")) + .put(MAX_RESTORE_BYTES_PER_SEC, (newRateBytes + "b")); + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( + client().admin().cluster(), + repositoryName, + FsRepository.TYPE, + true, + repoSettings + ); + FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName); + assertThat(newRepository, sameInstance(repository)); + snapshotRateLimiter = newRepository.snapshotRateLimiter(); + assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024))); + restoreRateLimiter = newRepository.restoreRateLimiter(); + assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024))); + remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter(); + assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter(); + assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter(); + assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024))); + } + + // the new settings are not all equal to the old settings, so the repository will be not reloaded + { + rateBytes = rateBytes / 2; + repoSettings = Settings.builder() + .put("location", path) + .put("io_buffer_size", "8mb") + .put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b")) + .put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b")); + OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides( + client().admin().cluster(), + repositoryName, + FsRepository.TYPE, + true, + repoSettings + ); + FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName); + assertNotEquals(newRepository, repository); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java index 252efcdc979bb..af17ef596b6a4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java @@ -50,6 +50,7 @@ import org.opensearch.common.util.concurrent.UncategorizedExecutionException; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.AbstractDisruptionTestCase; import org.opensearch.plugins.Plugin; @@ -154,6 +155,7 @@ public void testSettingsUpdateFailWhenCreateSnapshotInProgress() throws Exceptio Thread.sleep(1000); // Wait for the snapshot to start assertFalse(createSlowFuture.isDone()); // Ensure the snapshot is still in progress // Attempt to update the repository settings while the snapshot is in progress + settings.put("chunk_size", 2000, ByteSizeUnit.BYTES); IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", settings)); // Verify that the update fails with an appropriate exception assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage()); @@ -180,10 +182,9 @@ public void testSettingsUpdateFailWhenDeleteSnapshotInProgress() throws Interrup Thread.sleep(1000); // Wait for the delete operation to start assertFalse(future.isDone()); // Ensure the delete operation is still in progress // Attempt to update the repository settings while the delete operation is in progress - IllegalStateException ex = assertThrows( - IllegalStateException.class, - () -> updateRepository(repoName, "mock", randomRepositorySettings()) - ); + Settings.Builder newSettings = randomRepositorySettings(); + newSettings.put("chunk_size", 2000, ByteSizeUnit.BYTES); + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", newSettings)); // Verify that the update fails with an appropriate exception assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage()); unblockNode(repoName, clusterManagerName); // Unblock the delete operation diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 994c981b8bb0c..1060c946ae799 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -174,7 +174,7 @@ public RepositoriesService( public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener listener) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; - final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata( + RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata( request.name(), request.type(), request.settings(), @@ -206,14 +206,32 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final registrationListener = listener; } - // Trying to create the new repository on cluster-manager to make sure it works - try { - closeRepository(createRepository(newRepositoryMetadata, typesRegistry)); - } catch (Exception e) { - registrationListener.onFailure(e); - return; + Repository currentRepository = repositories.get(request.name()); + boolean isReloadableSettings = currentRepository != null && currentRepository.isReloadableSettings(newRepositoryMetadata); + + if (isReloadableSettings) { + // We are reloading the repository, so we need to preserve the old settings in the new repository metadata + Settings updatedSettings = Settings.builder() + .put(currentRepository.getMetadata().settings()) + .put(newRepositoryMetadata.settings()) + .build(); + newRepositoryMetadata = new RepositoryMetadata( + newRepositoryMetadata.name(), + newRepositoryMetadata.type(), + updatedSettings, + newRepositoryMetadata.cryptoMetadata() + ); + } else { + // Trying to create the new repository on cluster-manager to make sure it works + try { + closeRepository(createRepository(newRepositoryMetadata, typesRegistry)); + } catch (Exception e) { + registrationListener.onFailure(e); + return; + } } + final RepositoryMetadata finalRepositoryMetadata = newRepositoryMetadata; clusterService.submitStateUpdateTask( "put_repository [" + request.name() + "]", new AckedClusterStateUpdateTask(request, registrationListener) { @@ -224,7 +242,9 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryNotInUse(currentState, request.name()); + if (isReloadableSettings == false) { + ensureRepositoryNotInUse(currentState, request.name()); + } Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); @@ -245,17 +265,17 @@ public ClusterState execute(ClusterState currentState) { List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { - RepositoryMetadata updatedRepositoryMetadata = newRepositoryMetadata; + RepositoryMetadata updatedRepositoryMetadata = finalRepositoryMetadata; if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) { Settings updatedSettings = Settings.builder() - .put(newRepositoryMetadata.settings()) + .put(finalRepositoryMetadata.settings()) .put(SYSTEM_REPOSITORY_SETTING.getKey(), true) .build(); updatedRepositoryMetadata = new RepositoryMetadata( - newRepositoryMetadata.name(), - newRepositoryMetadata.type(), + finalRepositoryMetadata.name(), + finalRepositoryMetadata.type(), updatedSettings, - newRepositoryMetadata.cryptoMetadata() + finalRepositoryMetadata.cryptoMetadata() ); } if (repositoryMetadata.name().equals(updatedRepositoryMetadata.name())) { @@ -481,7 +501,8 @@ public void applyClusterState(ClusterChangedEvent event) { if (previousMetadata.type().equals(repositoryMetadata.type()) == false || previousMetadata.settings().equals(repositoryMetadata.settings()) == false) { // Previous version is different from the version in settings - if (repository.isSystemRepository() && repository.isReloadable()) { + if ((repository.isSystemRepository() && repository.isReloadable()) + || repository.isReloadableSettings(repositoryMetadata)) { logger.debug( "updating repository [{}] in-place to use new metadata [{}]", repositoryMetadata.name(), diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 259c4a6e09ce7..521187f48b375 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -602,6 +602,10 @@ default boolean isReloadable() { return false; } + default boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) { + return false; + } + /** * Reload the repository inplace */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 940942b816536..a1ae20db7f5fd 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -264,6 +264,46 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold(); + public static final String MAX_SNAPSHOT_BYTES_PER_SEC = "max_snapshot_bytes_per_sec"; + + public static final Setting SNAPSHOT_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + MAX_SNAPSHOT_BYTES_PER_SEC, + new ByteSizeValue(40, ByteSizeUnit.MB), + Setting.Property.NodeScope + ); + + public static final String MAX_RESTORE_BYTES_PER_SEC = "max_restore_bytes_per_sec"; + + public static final Setting RESTORE_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + MAX_RESTORE_BYTES_PER_SEC, + ByteSizeValue.ZERO, + Setting.Property.NodeScope + ); + + public static final String MAX_REMOTE_UPLOAD_BYTES_PER_SEC = "max_remote_upload_bytes_per_sec"; + + public static final Setting MAX_REMOTE_UPLOAD_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + MAX_REMOTE_UPLOAD_BYTES_PER_SEC, + ByteSizeValue.ZERO, + Setting.Property.NodeScope + ); + + public static final String MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC = "max_remote_low_priority_upload_bytes_per_sec"; + + public static final Setting MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, + ByteSizeValue.ZERO, + Setting.Property.NodeScope + ); + + public static final String MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC = "max_remote_download_bytes_per_sec"; + + public static final Setting MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, + ByteSizeValue.ZERO, + Setting.Property.NodeScope + ); + /** * Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions. * This ensures compatibility across various JDK versions. For a practical usage example, @@ -328,6 +368,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + private static final Set RELOADABLE_SETTINGS = Set.of( + MAX_RESTORE_BYTES_PER_SEC, + MAX_SNAPSHOT_BYTES_PER_SEC, + MAX_REMOTE_UPLOAD_BYTES_PER_SEC, + MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, + MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC + ); + public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() { return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2); } @@ -592,15 +640,11 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) { this.metadata = repositoryMetadata; supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); - snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); - restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); - remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); - remoteUploadLowPriorityRateLimiter = getRateLimiter( - metadata.settings(), - "max_remote_low_priority_upload_bytes_per_sec", - ByteSizeValue.ZERO - ); - remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); + snapshotRateLimiter = getRateLimiter(SNAPSHOT_BYTES_PER_SEC_SETTING, metadata.settings()); + restoreRateLimiter = getRateLimiter(RESTORE_BYTES_PER_SEC_SETTING, metadata.settings()); + remoteUploadRateLimiter = getRateLimiter(MAX_REMOTE_UPLOAD_BYTES_PER_SEC_SETTING, metadata.settings()); + remoteUploadLowPriorityRateLimiter = getRateLimiter(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC_SETTING, metadata.settings()); + remoteDownloadRateLimiter = getRateLimiter(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC_SETTING, metadata.settings()); readOnly = READONLY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); @@ -2891,17 +2935,16 @@ private BlobPath shardPath(IndexId indexId, int shardId) { /** * Configures RateLimiter based on repository and global settings * - * @param repositorySettings repository settings - * @param setting setting to use to configure rate limiter - * @param defaultRate default limiting rate + * @param bytesPerSecSetting setting to use to configure rate limiter + * @param settings repository settings * @return rate limiter or null of no throttling is needed */ - private RateLimiter getRateLimiter(Settings repositorySettings, String setting, ByteSizeValue defaultRate) { - ByteSizeValue maxSnapshotBytesPerSec = repositorySettings.getAsBytesSize(setting, defaultRate); - if (maxSnapshotBytesPerSec.getBytes() <= 0) { + private RateLimiter getRateLimiter(Setting bytesPerSecSetting, Settings settings) { + ByteSizeValue maxByteSize = bytesPerSecSetting.get(settings); + if (maxByteSize.getBytes() <= 0) { return null; } else { - return new RateLimiter.SimpleRateLimiter(maxSnapshotBytesPerSec.getMbFrac()); + return new RateLimiter.SimpleRateLimiter(maxByteSize.getMbFrac()); } } @@ -4326,6 +4369,31 @@ public InputStream maybeRateLimitSnapshots(InputStream stream) { return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT); } + // Visible for testing + public RateLimiter snapshotRateLimiter() { + return snapshotRateLimiter; + } + + // Visible for testing + public RateLimiter restoreRateLimiter() { + return restoreRateLimiter; + } + + // Visible for testing + public RateLimiter remoteUploadRateLimiter() { + return remoteUploadRateLimiter; + } + + // Visible for testing + public RateLimiter remoteUploadLowPriorityRateLimiter() { + return remoteUploadLowPriorityRateLimiter; + } + + // Visible for testing + public RateLimiter remoteDownloadRateLimiter() { + return remoteDownloadRateLimiter; + } + @Override public List> getRestrictedSystemRepositorySettings() { return Arrays.asList(SYSTEM_REPOSITORY_SETTING, READONLY_SETTING, REMOTE_STORE_INDEX_SHALLOW_COPY); @@ -4715,6 +4783,31 @@ private static Optional extractShallowSnapshotUUID(String blobName) { return Optional.empty(); } + @Override + public boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) { + if (metadata.name().equals(newRepositoryMetadata.name()) == false + || metadata.type().equals(newRepositoryMetadata.type()) == false + || Objects.equals(metadata.cryptoMetadata(), newRepositoryMetadata.cryptoMetadata()) == false) { + return false; + } + Settings newSettings = newRepositoryMetadata.settings(); + if (RELOADABLE_SETTINGS.containsAll(newSettings.keySet())) { + // the new settings are all contained in RELOADABLE_SETTINGS + return true; + } else { + Settings currentSettings = metadata.settings(); + // In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings + Set allKeys = Stream.concat(newSettings.keySet().stream(), currentSettings.keySet().stream()) + .filter(key -> !RELOADABLE_SETTINGS.contains(key)) + .collect(Collectors.toSet()); + return allKeys.stream().allMatch(key -> areSettingsEqual(newSettings, currentSettings, key)); + } + } + + private boolean areSettingsEqual(Settings s1, Settings s2, String key) { + return s1.hasValue(key) == s2.hasValue(key) && (s1.hasValue(key) ? Objects.equals(s1.get(key), s2.get(key)) : true); + } + /** * The result of removing a snapshot from a shard folder in the repository. */ From 9c6112796319d8a525c77b5aa4b4ab8fd34e87f4 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Mon, 30 Jun 2025 21:20:15 +0800 Subject: [PATCH 2/2] simple the code Signed-off-by: kkewwei Signed-off-by: kkewwei --- .../repositories/blobstore/BlobStoreRepository.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index a1ae20db7f5fd..7cdbc31563654 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -4800,12 +4800,12 @@ public boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) { Set allKeys = Stream.concat(newSettings.keySet().stream(), currentSettings.keySet().stream()) .filter(key -> !RELOADABLE_SETTINGS.contains(key)) .collect(Collectors.toSet()); - return allKeys.stream().allMatch(key -> areSettingsEqual(newSettings, currentSettings, key)); + return allKeys.stream().allMatch(key -> isSettingEqual(newSettings, currentSettings, key)); } } - private boolean areSettingsEqual(Settings s1, Settings s2, String key) { - return s1.hasValue(key) == s2.hasValue(key) && (s1.hasValue(key) ? Objects.equals(s1.get(key), s2.get(key)) : true); + private boolean isSettingEqual(Settings s1, Settings s2, String key) { + return Objects.equals(s1.get(key), s2.get(key)); } /**