diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6641618e6a890..6563075146610 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -32,7 +32,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
+- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
+- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
- Enable skip_list for @timestamp field or index sort field by default([#19480](https://github.com/opensearch-project/OpenSearch/pull/19480))
- Implement GRPC MatchPhrase, MultiMatch queries ([#19449](https://github.com/opensearch-project/OpenSearch/pull/19449))
- Optimize gRPC transport thread management for improved throughput ([#19278](https://github.com/opensearch-project/OpenSearch/pull/19278))
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 7fe3cde1e23f2..e5b7d7737bb08 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -113,6 +113,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
+import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
@@ -519,6 +520,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY,
+ MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,
diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
index 8a5eafef4a10a..7bbcdec25ce12 100644
--- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
@@ -90,6 +90,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MergeSchedulerConfig.AUTO_THROTTLE_SETTING,
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
+ MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
IndexMetadata.SETTING_INDEX_VERSION_CREATED,
IndexMetadata.SETTING_INDEX_CREATION_DATE,
IndexMetadata.INDEX_UUID_SETTING,
diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java
index 22441df923bf8..6612ea732942d 100644
--- a/server/src/main/java/org/opensearch/index/IndexService.java
+++ b/server/src/main/java/org/opensearch/index/IndexService.java
@@ -1235,6 +1235,18 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce);
}
+ /**
+ * Called whenever the cluster level {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} changes.
+ * The change is only applied if the index doesn't have its own explicit force merge MB per sec setting.
+ *
+ * @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
+ */
+ public void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
+ if (!MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings())) {
+ indexSettings.getMergeSchedulerConfig().setMaxForceMergeMBPerSec(maxForceMergeMBPerSec);
+ }
+ }
+
/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java
index a10f9d8152a79..bf3fef3d83caf 100644
--- a/server/src/main/java/org/opensearch/index/IndexSettings.java
+++ b/server/src/main/java/org/opensearch/index/IndexSettings.java
@@ -1178,6 +1178,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mergeSchedulerConfig::setMaxThreadAndMergeCount
);
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
+ scopedSettings.addSettingsUpdateConsumer(
+ MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
+ value -> mergeSchedulerConfig.updateMaxForceMergeMBPerSec(this)
+ );
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
scopedSettings.addSettingsUpdateConsumer(
diff --git a/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java b/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java
index a93a362a70c78..bbd375dfd4375 100644
--- a/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java
+++ b/server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java
@@ -63,6 +63,28 @@
* unluckily suddenly requires a large merge will see that merge aggressively
* throttled, while an application doing heavy indexing will see the throttle
* move higher to allow merges to keep up with ongoing indexing.
+ *
+ *
index.merge.scheduler.max_force_merge_mb_per_sec:
+ *
+ * Controls the rate limiting for forced merges in MB per second at the index level.
+ * The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
+ * Setting a finite positive value will limit the throughput of forced merge operations
+ * to the specified rate. This setting takes precedence over the cluster-level setting.
+ *
+ *
cluster.merge.scheduler.max_force_merge_mb_per_sec:
+ *
+ * Controls the rate limiting for forced merges in MB per second at the cluster level.
+ * The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
+ * This setting is used as a fallback when the index-level setting is not specified.
+ * Index-level settings take precedence over this cluster-level setting.
+ *
+ *
+ *
Setting Precedence:
+ *
+ * - If only index-level setting is specified: uses index value
+ *
- If only cluster-level setting is specified: uses cluster value
+ *
- If both settings are specified: uses index value (index takes precedence)
+ *
- If neither setting is specified: uses default value (Double.POSITIVE_INFINITY)
*
*
* @opensearch.api
@@ -90,16 +112,35 @@ public final class MergeSchedulerConfig {
Property.Dynamic,
Property.IndexScope
);
+ public static final Setting MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
+ "index.merge.scheduler.max_force_merge_mb_per_sec",
+ Double.POSITIVE_INFINITY,
+ 0.0d,
+ Double.POSITIVE_INFINITY,
+ Property.Dynamic,
+ Property.IndexScope
+ );
+
+ public static final Setting CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
+ "cluster.merge.scheduler.max_force_merge_mb_per_sec",
+ Double.POSITIVE_INFINITY,
+ 0.0d,
+ Double.POSITIVE_INFINITY,
+ Property.Dynamic,
+ Property.NodeScope
+ );
private volatile boolean autoThrottle;
private volatile int maxThreadCount;
private volatile int maxMergeCount;
+ private volatile double maxForceMergeMBPerSec;
MergeSchedulerConfig(IndexSettings indexSettings) {
int maxThread = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
setMaxThreadAndMergeCount(maxThread, maxMerge);
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
+ updateMaxForceMergeMBPerSec(indexSettings);
}
/**
@@ -151,4 +192,34 @@ void setMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) {
public int getMaxMergeCount() {
return maxMergeCount;
}
+
+ /**
+ * Returns the maximum force merge rate in MB per second.
+ * A value of Double.POSITIVE_INFINITY indicates no rate limiting.
+ */
+ public double getMaxForceMergeMBPerSec() {
+ return maxForceMergeMBPerSec;
+ }
+
+ /**
+ * Sets the maximum force merge rate in MB per second.
+ * A value of Double.POSITIVE_INFINITY disables rate limiting.
+ */
+ void setMaxForceMergeMBPerSec(double maxForceMergeMBPerSec) {
+ this.maxForceMergeMBPerSec = maxForceMergeMBPerSec;
+ }
+
+ /**
+ * Updates the maximum force merge rate based on index settings, with fallback to cluster settings.
+ * This method handles the case where an index-level setting is removed and should
+ * fall back to the cluster-level setting.
+ */
+ public void updateMaxForceMergeMBPerSec(IndexSettings indexSettings) {
+ boolean hasIndexSetting = MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings());
+ if (hasIndexSetting) {
+ this.maxForceMergeMBPerSec = indexSettings.getValue(MAX_FORCE_MERGE_MB_PER_SEC_SETTING);
+ } else {
+ this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings());
+ }
+ }
}
diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java
index 8d62bb8d8f920..a9c46759b96f8 100644
--- a/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java
+++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java
@@ -39,7 +39,6 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
-import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
@@ -64,7 +63,7 @@
class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
protected final Logger logger;
- private final Settings indexSettings;
+ private final IndexSettings indexSettings;
private final ShardId shardId;
private final MeanMetric totalMerges = new MeanMetric();
@@ -83,7 +82,7 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
- this.indexSettings = indexSettings.getSettings();
+ this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), shardId);
refreshConfig();
}
@@ -192,7 +191,10 @@ protected boolean maybeStall(MergeSource mergeSource) {
protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(mergeSource, merge);
thread.setName(
- OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName())
+ OpenSearchExecutors.threadName(
+ indexSettings.getSettings(),
+ "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName()
+ )
);
return thread;
}
@@ -215,6 +217,9 @@ MergeStats stats() {
}
void refreshConfig() {
+ // Update the config with current index settings before using it
+ config.updateMaxForceMergeMBPerSec(indexSettings);
+
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
@@ -224,6 +229,26 @@ void refreshConfig() {
} else if (config.isAutoThrottle() == false && isEnabled) {
disableAutoIOThrottle();
}
+ applyMergeRateLimit();
+ }
+
+ /**
+ * Applies the merge rate limit based on the current configuration.
+ * If the setting value is Double.POSITIVE_INFINITY, rate limiting is disabled.
+ * Otherwise, auto-throttling is enabled as the available rate limiting mechanism.
+ */
+ private void applyMergeRateLimit() {
+ double maxForceMergeMBPerSec = config.getMaxForceMergeMBPerSec();
+ if (maxForceMergeMBPerSec != getForceMergeMBPerSec()) {
+ logger.info(
+ "[{}][{}] updating force merge rate limit from [{}] to [{}] MB/sec",
+ shardId.getIndexName(),
+ shardId.id(),
+ super.getForceMergeMBPerSec(),
+ maxForceMergeMBPerSec
+ );
+ setForceMergeMBPerSec(maxForceMergeMBPerSec);
+ }
}
}
diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java
index 1d74f6d7d02da..667eb9edb65f7 100644
--- a/server/src/main/java/org/opensearch/indices/IndicesService.java
+++ b/server/src/main/java/org/opensearch/indices/IndicesService.java
@@ -106,6 +106,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IngestionConsumerFactory;
+import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.cache.request.ShardRequestCache;
@@ -591,6 +592,11 @@ protected void closeInternal() {
this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate);
+ clusterService.getClusterSettings()
+ .addSettingsUpdateConsumer(
+ MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
+ this::onClusterLevelForceMergeMBPerSecUpdate
+ );
}
@InternalApi
@@ -694,6 +700,20 @@ private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) {
}
}
+ /**
+ * The changes to dynamic cluster setting {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} needs to be updated. This
+ * method gets called whenever the setting changes. We notify the change to all IndexService instances that are created on this node
+ * so they can update their merge schedulers accordingly.
+ *
+ * @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
+ */
+ private void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
+ for (Map.Entry entry : indices.entrySet()) {
+ IndexService indexService = entry.getValue();
+ indexService.onClusterLevelForceMergeMBPerSecUpdate(maxForceMergeMBPerSec);
+ }
+ }
+
private static BiFunction getTranslogFactorySupplier(
Supplier repositoriesServiceSupplier,
ThreadPool threadPool,
diff --git a/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java b/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java
new file mode 100644
index 0000000000000..752c21c5e108c
--- /dev/null
+++ b/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java
@@ -0,0 +1,218 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.engine;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.filter.RegexFilter;
+import org.opensearch.Version;
+import org.opensearch.common.logging.Loggers;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.test.OpenSearchTestCase;
+
+import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
+import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
+import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
+import static org.opensearch.index.IndexSettingsTests.newIndexMeta;
+import static org.opensearch.index.MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING;
+import static org.opensearch.index.MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Tests for merge rate limiting functionality in OpenSearchConcurrentMergeScheduler
+ */
+public class MergeRateLimitingTests extends OpenSearchTestCase {
+
+ private static class MockAppender extends AbstractAppender {
+ public boolean sawRateLimitUpdate;
+ public String lastRateLimitMessage;
+
+ MockAppender(final String name) throws IllegalAccessException {
+ super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null);
+ }
+
+ @Override
+ public void append(LogEvent event) {
+ String message = event.getMessage().getFormattedMessage();
+ if (event.getLevel() == Level.INFO && message.contains("updating force merge rate limit")) {
+ sawRateLimitUpdate = true;
+ lastRateLimitMessage = message;
+ }
+ }
+
+ @Override
+ public boolean ignoreExceptions() {
+ return false;
+ }
+
+ public void reset() {
+ sawRateLimitUpdate = false;
+ lastRateLimitMessage = null;
+ }
+ }
+
+ /**
+ * Test that index-level settings take precedence over cluster-level settings
+ */
+ public void testSettingPrecedence() {
+ // Test with only cluster-level setting
+ Settings nodeSettings = Settings.builder().put(CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "75.0").build();
+
+ Settings.Builder indexBuilder = Settings.builder()
+ .put(SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(SETTING_NUMBER_OF_SHARDS, "1")
+ .put(SETTING_NUMBER_OF_REPLICAS, "0");
+
+ IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", indexBuilder.build()), nodeSettings);
+ ShardId shardId = new ShardId("test_index", "test_uuid", 0);
+
+ OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
+
+ // Should use cluster-level setting
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(75.0));
+
+ // Test with both index and cluster-level settings - index should take precedence
+ indexBuilder.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "25.0");
+ indexSettings = new IndexSettings(newIndexMeta("test_index", indexBuilder.build()), nodeSettings);
+ scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
+
+ // Should use index-level setting
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(25.0));
+ }
+
+ /**
+ * Test that disabled rate limiting (Double.POSITIVE_INFINITY) works correctly
+ */
+ public void testDisabledRateLimiting() {
+ Settings.Builder builder = Settings.builder()
+ .put(SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(SETTING_NUMBER_OF_SHARDS, "1")
+ .put(SETTING_NUMBER_OF_REPLICAS, "0")
+ .put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), Double.POSITIVE_INFINITY);
+
+ IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), Settings.EMPTY);
+ ShardId shardId = new ShardId("test_index", "test_uuid", 0);
+
+ OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
+
+ // Should have no rate limiting
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(Double.POSITIVE_INFINITY));
+ }
+
+ /**
+ * Test that rate limiting configuration changes are applied when scheduler is refreshed
+ */
+ public void testDynamicRateLimitUpdates() throws Exception {
+ MockAppender mockAppender = new MockAppender("testDynamicRateLimitUpdates");
+ mockAppender.start();
+ final Logger logger = LogManager.getLogger(OpenSearchConcurrentMergeScheduler.class);
+ Loggers.addAppender(logger, mockAppender);
+ Loggers.setLevel(logger, Level.INFO);
+
+ try {
+ Settings.Builder builder = Settings.builder()
+ .put(SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(SETTING_NUMBER_OF_SHARDS, "1")
+ .put(SETTING_NUMBER_OF_REPLICAS, "0")
+ .put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "10.0");
+
+ IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), Settings.EMPTY);
+ ShardId shardId = new ShardId("test_index", "test_uuid", 0);
+
+ OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(10.0));
+
+ // Update to a different rate limit
+ mockAppender.reset();
+ builder.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "20.0");
+ indexSettings.updateIndexMetadata(newIndexMeta("test_index", builder.build()));
+ scheduler.refreshConfig();
+
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(20.0));
+ assertTrue("Should log rate limit update", mockAppender.sawRateLimitUpdate);
+
+ // Update to disable rate limiting
+ mockAppender.reset();
+ builder.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), Double.POSITIVE_INFINITY);
+ indexSettings.updateIndexMetadata(newIndexMeta("test_index", builder.build()));
+ scheduler.refreshConfig();
+
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(Double.POSITIVE_INFINITY));
+ assertTrue("Should log rate limit update", mockAppender.sawRateLimitUpdate);
+
+ // Re-enable with a new rate
+ mockAppender.reset();
+ builder.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "30.0");
+ indexSettings.updateIndexMetadata(newIndexMeta("test_index", builder.build()));
+ scheduler.refreshConfig();
+
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(30.0));
+ assertTrue("Should log rate limit update", mockAppender.sawRateLimitUpdate);
+
+ } finally {
+ Loggers.removeAppender(logger, mockAppender);
+ mockAppender.stop();
+ Loggers.setLevel(logger, (Level) null);
+ }
+ }
+
+ /**
+ * Test that when index-level setting is removed, scheduler falls back to cluster-level setting
+ */
+ public void testFallbackToClusterSettingWhenIndexSettingRemoved() throws Exception {
+ MockAppender mockAppender = new MockAppender("testFallbackToClusterSettingWhenIndexSettingRemoved");
+ mockAppender.start();
+ final Logger logger = LogManager.getLogger(OpenSearchConcurrentMergeScheduler.class);
+ Loggers.addAppender(logger, mockAppender);
+ Loggers.setLevel(logger, Level.INFO);
+
+ try {
+ Settings nodeSettings = Settings.builder().put(CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "50.0").build();
+
+ // Start with index-level setting that overrides cluster setting
+ Settings.Builder builder = Settings.builder()
+ .put(SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(SETTING_NUMBER_OF_SHARDS, "1")
+ .put(SETTING_NUMBER_OF_REPLICAS, "0")
+ .put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "25.0");
+
+ IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), nodeSettings);
+ ShardId shardId = new ShardId("test_index", "test_uuid", 0);
+
+ OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
+
+ // Should initially use index-level setting
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(25.0));
+
+ // Remove the index-level setting by NOT setting MAX_FORCE_MERGE_MB_PER_SEC_SETTING - should trigger fallback to cluster setting
+ mockAppender.reset();
+ Settings.Builder newBuilder = Settings.builder()
+ .put(SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(SETTING_NUMBER_OF_SHARDS, "1")
+ .put(SETTING_NUMBER_OF_REPLICAS, "0");
+
+ indexSettings.updateIndexMetadata(newIndexMeta("test_index", newBuilder.build()));
+ scheduler.refreshConfig();
+
+ // Should now use cluster-level setting
+ assertThat(scheduler.getForceMergeMBPerSec(), equalTo(50.0));
+ assertTrue("Should log rate limit update when falling back to cluster setting", mockAppender.sawRateLimitUpdate);
+
+ } finally {
+ Loggers.removeAppender(logger, mockAppender);
+ mockAppender.stop();
+ Loggers.setLevel(logger, (Level) null);
+ }
+ }
+}