diff --git a/CHANGELOG.md b/CHANGELOG.md index bdd810dd8d50d..8059b96a515b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351)) - Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918)) - Add subdirectory-aware store module with recovery support ([#19132](https://github.com/opensearch-project/OpenSearch/pull/19132)) - +- Add a dynamic cluster setting to control the enablement of the merged segment warmer ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) ### Changed - Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998)) - IllegalArgumentException when scroll ID references a node not found in Cluster ([#19031](https://github.com/opensearch-project/OpenSearch/pull/19031)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java index 41e08690f22be..5ee8b1aa738fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java @@ -24,6 +24,7 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.ConnectTransportException; @@ -45,7 +46,10 @@ public class MergedSegmentWarmerIT extends SegmentReplicationIT { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) + .build(); } @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java index 94f7b65f72c17..4895ee589f88b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java @@ -16,6 +16,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexSettings; import org.opensearch.index.TieredMergePolicyProvider; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -42,6 +43,7 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath)) + .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) .build(); } @@ -62,13 +64,11 @@ public void testMergeSegmentWarmerRemote() throws Exception { final String node2 = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); - MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance( - TransportService.class, - node1 - ); - MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance( + + String primaryShardNode = findprimaryShardNode(INDEX_NAME); + MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance( TransportService.class, - node2 + primaryShardNode ); final CountDownLatch latch = new CountDownLatch(1); StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> { @@ -82,6 +82,8 @@ public void testMergeSegmentWarmerRemote() throws Exception { connection.sendRequest(requestId, action, request, options); }; + mockTransportServicePrimary.addSendBehavior(behavior); + for (int i = 0; i < 30; i++) { client().prepareIndex(INDEX_NAME) .setId(String.valueOf(i)) @@ -92,14 +94,10 @@ public void testMergeSegmentWarmerRemote() throws Exception { waitForSearchableDocs(30, node1, node2); - mockTransportServiceNode1.addSendBehavior(behavior); - mockTransportServiceNode2.addSendBehavior(behavior); - client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2)); waitForSegmentCount(INDEX_NAME, 2, logger); assertTrue(latch.await(10, TimeUnit.SECONDS)); - mockTransportServiceNode1.clearAllRules(); - mockTransportServiceNode2.clearAllRules(); + mockTransportServicePrimary.clearAllRules(); } public void testConcurrentMergeSegmentWarmerRemote() throws Exception { @@ -115,14 +113,13 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception { .build() ); ensureGreen(INDEX_NAME); - MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance( - TransportService.class, - node1 - ); - MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance( + + String primaryShardNode = findprimaryShardNode(INDEX_NAME); + MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance( TransportService.class, - node2 + primaryShardNode ); + CountDownLatch latch = new CountDownLatch(2); AtomicLong numInvocations = new AtomicLong(0); Set executingThreads = ConcurrentHashMap.newKeySet(); @@ -139,8 +136,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception { connection.sendRequest(requestId, action, request, options); }; - mockTransportServiceNode1.addSendBehavior(behavior); - mockTransportServiceNode2.addSendBehavior(behavior); + mockTransportServicePrimary.addSendBehavior(behavior); for (int i = 0; i < 30; i++) { client().prepareIndex(INDEX_NAME) @@ -158,8 +154,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception { assertTrue(executingThreads.size() > 1); // Verify concurrent execution by checking that multiple unique threads handled merge operations assertTrue(numInvocations.get() > 1); - mockTransportServiceNode1.clearAllRules(); - mockTransportServiceNode2.clearAllRules(); + mockTransportServicePrimary.clearAllRules(); } public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception { @@ -179,4 +174,60 @@ public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception { final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get(); assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size()); } + + public void testMergeSegmentWarmerWithWarmingDisabled() throws Exception { + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + String primaryNodeName = findprimaryShardNode(INDEX_NAME); + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build() + ) + .get(); + + MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNodeName + ); + + CountDownLatch warmingLatch = new CountDownLatch(1); + StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> { + if (action.equals("indices:admin/remote_publish_merged_segment[r]")) { + warmingLatch.countDown(); // This should NOT happen + } + connection.sendRequest(requestId, action, request, options); + }; + + mockTransportServicePrimary.addSendBehavior(behavior); + + for (int i = 0; i < 30; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource("foo" + i, "bar" + i) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + } + + client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get(); + final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get(); + assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size()); + assertFalse("Warming should be skipped when disabled", warmingLatch.await(5, TimeUnit.SECONDS)); + mockTransportServicePrimary.clearAllRules(); + } + + /** + * Returns the node name for the node hosting the primary shard for index "indexName" + */ + private String findprimaryShardNode(String indexName) { + String nodeId = internalCluster().clusterService().state().routingTable().index(indexName).shard(0).primaryShard().currentNodeId(); + + return internalCluster().clusterService().state().nodes().get(nodeId).getName(); + + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3f954cd9f9c37..e4bd3177ada4e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -319,6 +319,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING, RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, diff --git a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java index 138c9c484c07d..29cbcdd04484e 100644 --- a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java +++ b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java @@ -51,6 +51,9 @@ public MergedSegmentWarmer( @Override public void warm(LeafReader leafReader) throws IOException { + if (shouldWarm() == false) { + return; + } // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. assert leafReader instanceof SegmentReader; assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); @@ -72,4 +75,9 @@ public void warm(LeafReader leafReader) throws IOException { ); }); } + + // package-private for tests + boolean shouldWarm() { + return indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == true; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java index 71b7d316790d8..5eda7a53c5660 100644 --- a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexWriter; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.transport.TransportService; @@ -34,12 +35,15 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet } public IndexWriter.IndexReaderWarmer get(IndexShard shard) { - if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) { - return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard); - } else if (shard.indexSettings().isDocumentReplication()) { - // MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication, - // IndexWriter.IndexReaderWarmer should be null. + if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false + || shard.indexSettings().isDocumentReplication()) { + // MergedSegmentWarmerFactory#get is called by IndexShard#newEngineConfig on the initialization of a new indexShard and + // in cases of updates to shard state. + // 1. IndexWriter.IndexReaderWarmer should be null when IndexMetadata.INDEX_REPLICATION_TYPE_SETTING == ReplicationType.DOCUMENT + // 2. IndexWriter.IndexReaderWarmer should be null when the FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG == false return null; + } else if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) { + return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard); } // We just handle known cases and throw exception at the last. This will allow predictability on the IndexReaderWarmer behaviour. throw new IllegalStateException(shard.shardId() + " can't determine IndexReaderWarmer"); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index b8b572f8037ef..35cdfe278f4f0 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -40,8 +40,10 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Setting.Validator; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; @@ -75,6 +77,28 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Dynamic setting to enable the merged segment warming(pre-copy) feature, default: false + */ + public static final Setting INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING = Setting.boolSetting( + "indices.replication.merged_segment_warmer_enabled", + false, + new Validator() { + @Override + public void validate(Boolean value) { + if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) { + throw new IllegalArgumentException( + "FeatureFlag " + + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG + + " must be enabled to set this property to true." + ); + } + } + }, + Property.Dynamic, + Property.NodeScope + ); + /** * Individual speed setting for merged segment replication, default -1B to reuse the setting of recovery. */ @@ -211,6 +235,7 @@ public class RecoverySettings { private volatile ByteSizeValue recoveryMaxBytesPerSec; private volatile ByteSizeValue replicationMaxBytesPerSec; + private volatile boolean mergedSegmentReplicationWarmerEnabled; private volatile ByteSizeValue mergedSegmentReplicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; @@ -250,6 +275,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); + this.mergedSegmentReplicationWarmerEnabled = INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.get(settings); this.mergedSegmentReplicationMaxBytesPerSec = INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); this.mergedSegmentReplicationTimeout = INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.get(settings); replicationRateLimiter = getReplicationRateLimiter(replicationMaxBytesPerSec); @@ -261,6 +287,10 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer( + INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING, + this::setIndicesMergedSegmentReplicationWarmerEnabled + ); clusterSettings.addSettingsUpdateConsumer( INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setMergedSegmentReplicationMaxBytesPerSec @@ -442,4 +472,12 @@ private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStre this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams; } + public boolean isMergedSegmentReplicationWarmerEnabled() { + return mergedSegmentReplicationWarmerEnabled; + } + + public void setIndicesMergedSegmentReplicationWarmerEnabled(boolean mergedSegmentReplicationWarmerEnabled) { + this.mergedSegmentReplicationWarmerEnabled = mergedSegmentReplicationWarmerEnabled; + } + } diff --git a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerFactoryTests.java index 965bfdd5d0bed..ce84681b2b64e 100644 --- a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerFactoryTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; @@ -47,7 +48,14 @@ public void setUp() throws Exception { when(indexShard.shardId()).thenReturn(shardId); } - public void testGetWithSegmentReplicationEnabled() { + @Override + public void tearDown() throws Exception { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + super.tearDown(); + } + + public void testGetWithSegmentReplicationAndExperimentalFeatureFlagEnabled() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); IndexSettings indexSettings = createIndexSettings( false, Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() @@ -59,7 +67,19 @@ public void testGetWithSegmentReplicationEnabled() { assertTrue(warmer instanceof MergedSegmentWarmer); } - public void testGetWithRemoteStoreEnabled() { + public void testGetWithSegmentReplicationAndExperimentalFeatureFlagDisabled() { + IndexSettings indexSettings = createIndexSettings( + false, + Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() + ); + when(indexShard.indexSettings()).thenReturn(indexSettings); + IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard); + + assertNull(warmer); + } + + public void testGetWithRemoteStoreEnabledAndExperimentalFeatureFlagEnabled() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); IndexSettings indexSettings = createIndexSettings( true, Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() @@ -72,7 +92,32 @@ public void testGetWithRemoteStoreEnabled() { assertTrue(warmer instanceof MergedSegmentWarmer); } - public void testGetWithDocumentReplication() { + public void testGetWithRemoteStoreEnabledAndExperimentalFeatureFlagDisabled() { + IndexSettings indexSettings = createIndexSettings( + true, + Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() + ); + when(indexShard.indexSettings()).thenReturn(indexSettings); + + IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard); + + assertNull(warmer); + } + + public void testGetWithDocumentReplicationAndExperimentalFeatureFlagEnabled() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); + IndexSettings indexSettings = createIndexSettings( + false, + Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() + ); + + when(indexShard.indexSettings()).thenReturn(indexSettings); + IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard); + + assertNull(warmer); + } + + public void testGetWithDocumentReplicationAndExperimentalFeatureFlagDisabled() { IndexSettings indexSettings = createIndexSettings( false, Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() diff --git a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java new file mode 100644 index 0000000000000..6c2a4fd254d16 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MergedSegmentWarmerTests extends OpenSearchTestCase { + + IndexShard mockIndexShard; + MergedSegmentWarmer mergedSegmentWarmer; + + @Override + public void setUp() throws Exception { + super.setUp(); + mockIndexShard = mock(IndexShard.class); + when(mockIndexShard.shardId()).thenReturn(new ShardId(new Index("test-index", "_na_"), 0)); + mergedSegmentWarmer = new MergedSegmentWarmer(null, null, null, mockIndexShard); + } + + public void testShouldWarm_warmerEnabled() { + RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); + when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(true); + when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + + assertTrue( + "MergedSegmentWarmer#shouldWarm is expected to return true when merged segment warmer is enabled", + mergedSegmentWarmer.shouldWarm() + ); + } + + public void testShouldWarm_warmerDisabled() { + RecoverySettings mockRecoverySettings = mock(RecoverySettings.class); + when(mockRecoverySettings.isMergedSegmentReplicationWarmerEnabled()).thenReturn(false); + when(mockIndexShard.getRecoverySettings()).thenReturn(mockRecoverySettings); + + assertFalse( + "MergedSegmentWarmer#shouldWarm is expected to return false when merged segment warmer is disabled", + mergedSegmentWarmer.shouldWarm() + ); + } +} diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 794e14ab19943..a3f8a51ec2369 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -35,6 +35,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; @@ -45,6 +46,12 @@ public class RecoverySettingsDynamicUpdateTests extends OpenSearchTestCase { private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterSettings); + @Override + public void tearDown() throws Exception { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + super.tearDown(); + } + public void testZeroBytesPerSecondIsNoRateLimit() { clusterSettings.applySettings( Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() @@ -179,4 +186,36 @@ public void testInternalActionRetryTimeout() { ); assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionRetryTimeout()); } + + public void testMergedSegmentReplicationWarmerEnabledSetting() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build()); + + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build() + ); + assertTrue(recoverySettings.isMergedSegmentReplicationWarmerEnabled()); + + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build() + ); + assertFalse(recoverySettings.isMergedSegmentReplicationWarmerEnabled()); + } + + public void testMergedSegmentReplicationWarmerEnabledSettingInvalidUpdate() { + Exception e = assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build() + ) + ); + assertEquals( + "illegal value can't update [indices.replication.merged_segment_warmer_enabled] from [false] to [true]", + e.getMessage() + ); + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + assertEquals( + "FeatureFlag opensearch.experimental.feature.merged_segment_warmer.enabled must be enabled to set this property to true.", + e.getCause().getMessage() + ); + } }