Skip to content

Commit

Permalink
Add consumers to remote store based index settings (#14764)
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 authored Jul 16, 2024
1 parent d33d24e commit ba9bdac
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -21,12 +23,17 @@
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.Priority;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -261,4 +268,13 @@ public ClusterHealthStatus waitForRelocation(TimeValue t) {
}
return actionGet.getStatus();
}

protected IndexShard getIndexShard(String dataNode, String indexName) throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
return indexService.getShard(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -216,4 +217,12 @@ public void testEndToEndRemoteMigration() throws Exception {
asyncIndexingService.getIndexedDocs()
);
}

public void testRemoteSettingPropagatedToIndexShardAfterMigration() throws Exception {
testEndToEndRemoteMigration();
IndexShard indexShard = getIndexShard(primaryNodeName("test"), "test");
assertTrue(indexShard.indexSettings().isRemoteStoreEnabled());
assertEquals(MigrationBaseTestCase.REPOSITORY_NAME, indexShard.indexSettings().getRemoteStoreRepository());
assertEquals(MigrationBaseTestCase.REPOSITORY_2_NAME, indexShard.indexSettings().getRemoteStoreTranslogRepository());
}
}
27 changes: 24 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,11 +732,11 @@ public static IndexMergePolicy fromString(String text) {
private final Settings nodeSettings;
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private volatile boolean isRemoteStoreEnabled;
private final boolean isStoreLocalityPartial;
private volatile TimeValue remoteTranslogUploadBufferInterval;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private volatile String remoteStoreTranslogRepository;
private volatile String remoteStoreRepository;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;

Expand Down Expand Up @@ -1132,6 +1132,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this::setDocIdFuzzySetFalsePositiveProbability
);
scopedSettings.addSettingsUpdateConsumer(ALLOW_DERIVED_FIELDS, this::setAllowDerivedField);
scopedSettings.addSettingsUpdateConsumer(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, this::setRemoteStoreEnabled);
scopedSettings.addSettingsUpdateConsumer(
IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING,
this::setRemoteStoreRepository
);
scopedSettings.addSettingsUpdateConsumer(
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
this::setRemoteStoreTranslogRepository
);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1950,4 +1959,16 @@ public RemoteStorePathStrategy getRemoteStorePathStrategy() {
public boolean isTranslogMetadataEnabled() {
return isTranslogMetadataEnabled;
}

public void setRemoteStoreEnabled(boolean isRemoteStoreEnabled) {
this.isRemoteStoreEnabled = isRemoteStoreEnabled;
}

public void setRemoteStoreRepository(String remoteStoreRepository) {
this.remoteStoreRepository = remoteStoreRepository;
}

public void setRemoteStoreTranslogRepository(String remoteStoreTranslogRepository) {
this.remoteStoreTranslogRepository = remoteStoreTranslogRepository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2678,7 +2678,7 @@ public void snapshotRemoteStoreIndexShard(
final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
Expand Down

0 comments on commit ba9bdac

Please sign in to comment.