Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
- Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616))
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)

### Dependencies
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testSimpleWorkflow() {

public void testMissingUri() {
try {
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", Settings.builder());
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo1", "hdfs", Settings.builder());
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testNonHdfsUri() {
public void testPathSpecifiedInHdfs() {
try {
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///some/path");
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo2", "hdfs", settings);
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand All @@ -204,7 +204,7 @@ public void testPathSpecifiedInHdfs() {
public void testMissingPath() {
try {
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///");
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo3", "hdfs", settings);
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -42,12 +43,18 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Client;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_UPLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

// create repository
final String repositoryName = "test-repo";
Path path = randomRepoPath();
Settings.Builder repoSettings = Settings.builder()
.put("location", randomRepoPath())
.put("max_snapshot_bytes_per_sec", "10mb")
.put("max_restore_bytes_per_sec", "10mb");
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, "10mb")
.put(MAX_RESTORE_BYTES_PER_SEC, "10mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
Expand Down Expand Up @@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

try {
logger.info("--> begin to reset repository");
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
repoSettings = Settings.builder().put("location", randomRepoPath()).put(MAX_SNAPSHOT_BYTES_PER_SEC, "300mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
Expand All @@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

thread.join();
}

public void testAdjustBytesPerSecSettingForSnapAndRestore() {
final InternalTestCluster cluster = internalCluster();
final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
.iterator()
.next();

// create repository
final String repositoryName = "test-repo1";
long rateBytes = 200000;
Path path = randomRepoPath();
Settings.Builder repoSettings = Settings.builder()
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);

FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName);
RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter restoreRateLimiter = repository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteUploadRateLimiter = repository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteUploadLowPriorityRateLimiter = repository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteDownloadRateLimiter = repository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));

// adjust all the reloadable settings
{
rateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertThat(newRepository, sameInstance(repository));
snapshotRateLimiter = newRepository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
restoreRateLimiter = newRepository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
}

// In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings
{
long newRateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (newRateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (newRateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertThat(newRepository, sameInstance(repository));
snapshotRateLimiter = newRepository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
restoreRateLimiter = newRepository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
}

// the new settings are not all equal to the old settings, so the repository will be not reloaded
{
rateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put("location", path)
.put("io_buffer_size", "8mb")
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertNotEquals(newRepository, repository);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.AbstractDisruptionTestCase;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -154,6 +155,7 @@ public void testSettingsUpdateFailWhenCreateSnapshotInProgress() throws Exceptio
Thread.sleep(1000); // Wait for the snapshot to start
assertFalse(createSlowFuture.isDone()); // Ensure the snapshot is still in progress
// Attempt to update the repository settings while the snapshot is in progress
settings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", settings));
// Verify that the update fails with an appropriate exception
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
Expand All @@ -180,10 +182,9 @@ public void testSettingsUpdateFailWhenDeleteSnapshotInProgress() throws Interrup
Thread.sleep(1000); // Wait for the delete operation to start
assertFalse(future.isDone()); // Ensure the delete operation is still in progress
// Attempt to update the repository settings while the delete operation is in progress
IllegalStateException ex = assertThrows(
IllegalStateException.class,
() -> updateRepository(repoName, "mock", randomRepositorySettings())
);
Settings.Builder newSettings = randomRepositorySettings();
newSettings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", newSettings));
// Verify that the update fails with an appropriate exception
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
unblockNode(repoName, clusterManagerName); // Unblock the delete operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public RepositoriesService(
public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";

final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
request.name(),
request.type(),
request.settings(),
Expand Down Expand Up @@ -206,14 +206,32 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
registrationListener = listener;
}

// Trying to create the new repository on cluster-manager to make sure it works
try {
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
Repository currentRepository = repositories.get(request.name());
boolean isReloadableSettings = currentRepository != null && currentRepository.isReloadableSettings(newRepositoryMetadata);

if (isReloadableSettings) {
// We are reloading the repository, so we need to preserve the old settings in the new repository metadata
Settings updatedSettings = Settings.builder()
.put(currentRepository.getMetadata().settings())
.put(newRepositoryMetadata.settings())
.build();
newRepositoryMetadata = new RepositoryMetadata(
newRepositoryMetadata.name(),
newRepositoryMetadata.type(),
updatedSettings,
newRepositoryMetadata.cryptoMetadata()
);
} else {
// Trying to create the new repository on cluster-manager to make sure it works
try {
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
}
}

final RepositoryMetadata finalRepositoryMetadata = newRepositoryMetadata;
clusterService.submitStateUpdateTask(
"put_repository [" + request.name() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
Expand All @@ -224,7 +242,9 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name());
if (isReloadableSettings == false) {
ensureRepositoryNotInUse(currentState, request.name());
}
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
Expand All @@ -245,17 +265,17 @@ public ClusterState execute(ClusterState currentState) {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
RepositoryMetadata updatedRepositoryMetadata = newRepositoryMetadata;
RepositoryMetadata updatedRepositoryMetadata = finalRepositoryMetadata;
if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) {
Settings updatedSettings = Settings.builder()
.put(newRepositoryMetadata.settings())
.put(finalRepositoryMetadata.settings())
.put(SYSTEM_REPOSITORY_SETTING.getKey(), true)
.build();
updatedRepositoryMetadata = new RepositoryMetadata(
newRepositoryMetadata.name(),
newRepositoryMetadata.type(),
finalRepositoryMetadata.name(),
finalRepositoryMetadata.type(),
updatedSettings,
newRepositoryMetadata.cryptoMetadata()
finalRepositoryMetadata.cryptoMetadata()
);
}
if (repositoryMetadata.name().equals(updatedRepositoryMetadata.name())) {
Expand Down Expand Up @@ -481,7 +501,8 @@ public void applyClusterState(ClusterChangedEvent event) {
if (previousMetadata.type().equals(repositoryMetadata.type()) == false
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
// Previous version is different from the version in settings
if (repository.isSystemRepository() && repository.isReloadable()) {
if ((repository.isSystemRepository() && repository.isReloadable())
|| repository.isReloadableSettings(repositoryMetadata)) {
logger.debug(
"updating repository [{}] in-place to use new metadata [{}]",
repositoryMetadata.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@
return false;
}

default boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) {
return false;

Check warning on line 606 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L606

Added line #L606 was not covered by tests
}

/**
* Reload the repository inplace
*/
Expand Down
Loading
Loading