Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)

### Dependencies
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testSimpleWorkflow() {

public void testMissingUri() {
try {
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", Settings.builder());
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo1", "hdfs", Settings.builder());
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testNonHdfsUri() {
public void testPathSpecifiedInHdfs() {
try {
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///some/path");
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo2", "hdfs", settings);
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand All @@ -204,7 +204,7 @@ public void testPathSpecifiedInHdfs() {
public void testMissingPath() {
try {
Settings.Builder settings = Settings.builder().put("uri", "hdfs:///");
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo", "hdfs", settings);
OpenSearchIntegTestCase.putRepository(client().admin().cluster(), "test-repo3", "hdfs", settings);
fail();
} catch (RepositoryException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -42,12 +43,18 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Client;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_REMOTE_UPLOAD_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

// create repository
final String repositoryName = "test-repo";
Path path = randomRepoPath();
Settings.Builder repoSettings = Settings.builder()
.put("location", randomRepoPath())
.put("max_snapshot_bytes_per_sec", "10mb")
.put("max_restore_bytes_per_sec", "10mb");
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, "10mb")
.put(MAX_RESTORE_BYTES_PER_SEC, "10mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
Expand Down Expand Up @@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

try {
logger.info("--> begin to reset repository");
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
repoSettings = Settings.builder().put("location", randomRepoPath()).put(MAX_SNAPSHOT_BYTES_PER_SEC, "300mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
Expand All @@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted

thread.join();
}

public void testAdjustBytesPerSecSettingForSnapAndRestore() {
final InternalTestCluster cluster = internalCluster();
final RepositoriesService repositoriesService = cluster.getDataOrClusterManagerNodeInstances(RepositoriesService.class)
.iterator()
.next();

// create repository
final String repositoryName = "test-repo1";
long rateBytes = 200000;
Path path = randomRepoPath();
Settings.Builder repoSettings = Settings.builder()
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);

FsRepository repository = (FsRepository) repositoriesService.repository(repositoryName);
RateLimiter snapshotRateLimiter = repository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter restoreRateLimiter = repository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteUploadRateLimiter = repository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteUploadLowPriorityRateLimiter = repository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
RateLimiter remoteDownloadRateLimiter = repository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));

// adjust all the reloadable settings
{
rateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertThat(newRepository, sameInstance(repository));
snapshotRateLimiter = newRepository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
restoreRateLimiter = newRepository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
}

// In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings
{
long newRateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put("location", path)
.put(MAX_SNAPSHOT_BYTES_PER_SEC, (newRateBytes + "b"))
.put(MAX_RESTORE_BYTES_PER_SEC, (newRateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertThat(newRepository, sameInstance(repository));
snapshotRateLimiter = newRepository.snapshotRateLimiter();
assertThat(snapshotRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
restoreRateLimiter = newRepository.restoreRateLimiter();
assertThat(restoreRateLimiter.getMBPerSec(), equalTo((double) newRateBytes / (1024 * 1024)));
remoteUploadRateLimiter = newRepository.remoteUploadRateLimiter();
assertThat(remoteUploadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteUploadLowPriorityRateLimiter = newRepository.remoteUploadLowPriorityRateLimiter();
assertThat(remoteUploadLowPriorityRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
remoteDownloadRateLimiter = newRepository.remoteDownloadRateLimiter();
assertThat(remoteDownloadRateLimiter.getMBPerSec(), equalTo((double) rateBytes / (1024 * 1024)));
}

// the new settings are not all equal to the old settings, so the repository will be not reloaded
{
rateBytes = rateBytes / 2;
repoSettings = Settings.builder()
.put("location", path)
.put("io_buffer_size", "8mb")
.put(MAX_RESTORE_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC, (rateBytes + "b"))
.put(MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC, (rateBytes + "b"));
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
FsRepository newRepository = (FsRepository) repositoriesService.repository(repositoryName);
assertNotEquals(newRepository, repository);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.AbstractDisruptionTestCase;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -154,6 +155,7 @@ public void testSettingsUpdateFailWhenCreateSnapshotInProgress() throws Exceptio
Thread.sleep(1000); // Wait for the snapshot to start
assertFalse(createSlowFuture.isDone()); // Ensure the snapshot is still in progress
// Attempt to update the repository settings while the snapshot is in progress
settings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", settings));
// Verify that the update fails with an appropriate exception
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
Expand All @@ -180,10 +182,9 @@ public void testSettingsUpdateFailWhenDeleteSnapshotInProgress() throws Interrup
Thread.sleep(1000); // Wait for the delete operation to start
assertFalse(future.isDone()); // Ensure the delete operation is still in progress
// Attempt to update the repository settings while the delete operation is in progress
IllegalStateException ex = assertThrows(
IllegalStateException.class,
() -> updateRepository(repoName, "mock", randomRepositorySettings())
);
Settings.Builder newSettings = randomRepositorySettings();
newSettings.put("chunk_size", 2000, ByteSizeUnit.BYTES);
IllegalStateException ex = assertThrows(IllegalStateException.class, () -> updateRepository(repoName, "mock", newSettings));
// Verify that the update fails with an appropriate exception
assertEquals("trying to modify or unregister repository that is currently used", ex.getMessage());
unblockNode(repoName, clusterManagerName); // Unblock the delete operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public RepositoriesService(
public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";

final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
request.name(),
request.type(),
request.settings(),
Expand Down Expand Up @@ -206,14 +206,32 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
registrationListener = listener;
}

// Trying to create the new repository on cluster-manager to make sure it works
try {
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
Repository currentRepository = repositories.get(request.name());
boolean isReloadableSettings = currentRepository != null && currentRepository.isReloadableSettings(newRepositoryMetadata);

if (isReloadableSettings) {
// We are reloading the repository, so we need to preserve the old settings in the new repository metadata
Settings updatedSettings = Settings.builder()
.put(currentRepository.getMetadata().settings())
.put(newRepositoryMetadata.settings())
.build();
newRepositoryMetadata = new RepositoryMetadata(
newRepositoryMetadata.name(),
newRepositoryMetadata.type(),
updatedSettings,
newRepositoryMetadata.cryptoMetadata()
);
} else {
// Trying to create the new repository on cluster-manager to make sure it works
try {
closeRepository(createRepository(newRepositoryMetadata, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
}
}

final RepositoryMetadata finalRepositoryMetadata = newRepositoryMetadata;
clusterService.submitStateUpdateTask(
"put_repository [" + request.name() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
Expand All @@ -224,7 +242,9 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryNotInUse(currentState, request.name());
if (isReloadableSettings == false) {
ensureRepositoryNotInUse(currentState, request.name());
}
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
Expand All @@ -245,17 +265,17 @@ public ClusterState execute(ClusterState currentState) {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
RepositoryMetadata updatedRepositoryMetadata = newRepositoryMetadata;
RepositoryMetadata updatedRepositoryMetadata = finalRepositoryMetadata;
if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) {
Settings updatedSettings = Settings.builder()
.put(newRepositoryMetadata.settings())
.put(finalRepositoryMetadata.settings())
.put(SYSTEM_REPOSITORY_SETTING.getKey(), true)
.build();
updatedRepositoryMetadata = new RepositoryMetadata(
newRepositoryMetadata.name(),
newRepositoryMetadata.type(),
finalRepositoryMetadata.name(),
finalRepositoryMetadata.type(),
updatedSettings,
newRepositoryMetadata.cryptoMetadata()
finalRepositoryMetadata.cryptoMetadata()
);
}
if (repositoryMetadata.name().equals(updatedRepositoryMetadata.name())) {
Expand Down Expand Up @@ -481,7 +501,8 @@ public void applyClusterState(ClusterChangedEvent event) {
if (previousMetadata.type().equals(repositoryMetadata.type()) == false
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
// Previous version is different from the version in settings
if (repository.isSystemRepository() && repository.isReloadable()) {
if ((repository.isSystemRepository() && repository.isReloadable())
|| repository.isReloadableSettings(repositoryMetadata)) {
logger.debug(
"updating repository [{}] in-place to use new metadata [{}]",
repositoryMetadata.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@
return false;
}

default boolean isReloadableSettings(RepositoryMetadata newRepositoryMetadata) {
return false;

Check warning on line 606 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L606

Added line #L606 was not covered by tests
}

/**
* Reload the repository inplace
*/
Expand Down
Loading
Loading