Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
- Enable skip_list for @timestamp field or index sort field by default([#19480](https://github.com/opensearch-project/OpenSearch/pull/19480))
- Implement GRPC MatchPhrase, MultiMatch queries ([#19449](https://github.com/opensearch-project/OpenSearch/pull/19449))
- Optimize gRPC transport thread management for improved throughput ([#19278](https://github.com/opensearch-project/OpenSearch/pull/19278))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
Expand Down Expand Up @@ -519,6 +520,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY,
MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MergeSchedulerConfig.AUTO_THROTTLE_SETTING,
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
IndexMetadata.SETTING_INDEX_VERSION_CREATED,
IndexMetadata.SETTING_INDEX_CREATION_DATE,
IndexMetadata.INDEX_UUID_SETTING,
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,18 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce);
}

/**
* Called whenever the cluster level {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} changes.
* The change is only applied if the index doesn't have its own explicit force merge MB per sec setting.
*
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
*/
public void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
if (!MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings())) {
indexSettings.getMergeSchedulerConfig().setMaxForceMergeMBPerSec(maxForceMergeMBPerSec);
}
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mergeSchedulerConfig::setMaxThreadAndMergeCount
);
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
scopedSettings.addSettingsUpdateConsumer(
MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
value -> mergeSchedulerConfig.updateMaxForceMergeMBPerSec(this)
);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
scopedSettings.addSettingsUpdateConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
* unluckily suddenly requires a large merge will see that merge aggressively
* throttled, while an application doing heavy indexing will see the throttle
* move higher to allow merges to keep up with ongoing indexing.
*
* <li><code>index.merge.scheduler.max_force_merge_mb_per_sec</code>:
* <p>
* Controls the rate limiting for forced merges in MB per second at the index level.
* The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
* Setting a finite positive value will limit the throughput of forced merge operations
* to the specified rate. This setting takes precedence over the cluster-level setting.
*
* <li><code>cluster.merge.scheduler.max_force_merge_mb_per_sec</code>:
* <p>
* Controls the rate limiting for forced merges in MB per second at the cluster level.
* The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
* This setting is used as a fallback when the index-level setting is not specified.
* Index-level settings take precedence over this cluster-level setting.
* </ul>
*
* <p><b>Setting Precedence:</b>
* <ul>
* <li>If only index-level setting is specified: uses index value
* <li>If only cluster-level setting is specified: uses cluster value
* <li>If both settings are specified: uses index value (index takes precedence)
* <li>If neither setting is specified: uses default value (Double.POSITIVE_INFINITY)
* </ul>
*
* @opensearch.api
Expand Down Expand Up @@ -90,16 +112,35 @@ public final class MergeSchedulerConfig {
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Double> MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
"index.merge.scheduler.max_force_merge_mb_per_sec",
Double.POSITIVE_INFINITY,
0.0d,
Double.POSITIVE_INFINITY,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Double> CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
"cluster.merge.scheduler.max_force_merge_mb_per_sec",
Double.POSITIVE_INFINITY,
0.0d,
Double.POSITIVE_INFINITY,
Property.Dynamic,
Property.NodeScope
);

private volatile boolean autoThrottle;
private volatile int maxThreadCount;
private volatile int maxMergeCount;
private volatile double maxForceMergeMBPerSec;

MergeSchedulerConfig(IndexSettings indexSettings) {
int maxThread = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
setMaxThreadAndMergeCount(maxThread, maxMerge);
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
updateMaxForceMergeMBPerSec(indexSettings);
}

/**
Expand Down Expand Up @@ -151,4 +192,34 @@ void setMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) {
public int getMaxMergeCount() {
return maxMergeCount;
}

/**
* Returns the maximum force merge rate in MB per second.
* A value of Double.POSITIVE_INFINITY indicates no rate limiting.
*/
public double getMaxForceMergeMBPerSec() {
return maxForceMergeMBPerSec;
}

/**
* Sets the maximum force merge rate in MB per second.
* A value of Double.POSITIVE_INFINITY disables rate limiting.
*/
void setMaxForceMergeMBPerSec(double maxForceMergeMBPerSec) {
this.maxForceMergeMBPerSec = maxForceMergeMBPerSec;
}

/**
* Updates the maximum force merge rate based on index settings, with fallback to cluster settings.
* This method handles the case where an index-level setting is removed and should
* fall back to the cluster-level setting.
*/
public void updateMaxForceMergeMBPerSec(IndexSettings indexSettings) {
boolean hasIndexSetting = MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings());
if (hasIndexSetting) {
this.maxForceMergeMBPerSec = indexSettings.getValue(MAX_FORCE_MERGE_MB_PER_SEC_SETTING);
} else {
this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
Expand All @@ -64,7 +63,7 @@
class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {

protected final Logger logger;
private final Settings indexSettings;
private final IndexSettings indexSettings;
private final ShardId shardId;

private final MeanMetric totalMerges = new MeanMetric();
Expand All @@ -83,7 +82,7 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), shardId);
refreshConfig();
}
Expand Down Expand Up @@ -192,7 +191,10 @@ protected boolean maybeStall(MergeSource mergeSource) {
protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(mergeSource, merge);
thread.setName(
OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName())
OpenSearchExecutors.threadName(
indexSettings.getSettings(),
"[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName()
)
);
return thread;
}
Expand All @@ -215,6 +217,9 @@ MergeStats stats() {
}

void refreshConfig() {
// Update the config with current index settings before using it
config.updateMaxForceMergeMBPerSec(indexSettings);

if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
Expand All @@ -224,6 +229,26 @@ void refreshConfig() {
} else if (config.isAutoThrottle() == false && isEnabled) {
disableAutoIOThrottle();
}
applyMergeRateLimit();
}

/**
* Applies the merge rate limit based on the current configuration.
* If the setting value is Double.POSITIVE_INFINITY, rate limiting is disabled.
* Otherwise, auto-throttling is enabled as the available rate limiting mechanism.
*/
private void applyMergeRateLimit() {
double maxForceMergeMBPerSec = config.getMaxForceMergeMBPerSec();
if (maxForceMergeMBPerSec != getForceMergeMBPerSec()) {
logger.info(
"[{}][{}] updating force merge rate limit from [{}] to [{}] MB/sec",
shardId.getIndexName(),
shardId.id(),
super.getForceMergeMBPerSec(),
maxForceMergeMBPerSec
);
setForceMergeMBPerSec(maxForceMergeMBPerSec);
}
}

}
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.cache.request.ShardRequestCache;
Expand Down Expand Up @@ -591,6 +592,11 @@ protected void closeInternal() {
this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
this::onClusterLevelForceMergeMBPerSecUpdate
);
}

@InternalApi
Expand Down Expand Up @@ -694,6 +700,20 @@ private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) {
}
}

/**
* The changes to dynamic cluster setting {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} needs to be updated. This
* method gets called whenever the setting changes. We notify the change to all IndexService instances that are created on this node
* so they can update their merge schedulers accordingly.
*
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
*/
private void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
for (Map.Entry<String, IndexService> entry : indices.entrySet()) {
IndexService indexService = entry.getValue();
indexService.onClusterLevelForceMergeMBPerSecUpdate(maxForceMergeMBPerSec);
}
}

private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTranslogFactorySupplier(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
Expand Down
Loading
Loading