diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e66b11a896e3..bfe82d06e2c8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189)) - Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908)) - Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817))) +- Added Auto Force Merge Manager to enhance hot to warm migration experience ([#18229](https://github.com/opensearch-project/OpenSearch/pull/18229)) - Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256)) - Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250))) - Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270))) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java new file mode 100644 index 0000000000000..bd17d82c4d46e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java @@ -0,0 +1,247 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.node.Node; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.opensearch.common.util.concurrent.OpenSearchExecutors.NODE_PROCESSORS_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +public class AutoForceMergeManagerIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME_1 = "test-auto-forcemerge-one"; + private static final String INDEX_NAME_2 = "test-auto-forcemerge-two"; + private static final int NUM_DOCS_IN_BULK = 1000; + private static final int INGESTION_COUNT = 3; + private static final String SCHEDULER_INTERVAL = "1s"; + private static final String TRANSLOG_AGE = "1s"; + private static final String MERGE_DELAY = "1s"; + private static final Integer SEGMENT_COUNT = 1; + + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .put(NODE_PROCESSORS_SETTING.getKey(), 32) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), SCHEDULER_INTERVAL) + .put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), TRANSLOG_AGE) + .put(ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.getKey(), SEGMENT_COUNT) + .put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), MERGE_DELAY) + .build(); + } + + public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException, ExecutionException { + + Settings clusterSettings = Settings.builder() + .put(super.nodeSettings(0)) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), false) + .build(); + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(clusterSettings); + String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst(); + internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst(); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get()); + + // Each ingestion request creates a segment here + for (int i = 0; i < INGESTION_COUNT; i++) { + indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_1); + } + IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1); + assertNotNull(shard); + + // Before stats + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); + + // This is to make sure auto force merge action gets triggered multiple times ang gets successful at least once. + Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3); + // refresh to clear old segments + flushAndRefresh(INDEX_NAME_1); + + // After stats + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount()); + + // Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file + // leaks + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); + } + + public void testAutoForceMergeTriggeringWithOneShardOfNonWarmCandidate() throws Exception { + Settings clusterSettings = Settings.builder() + .put(super.nodeSettings(0)) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true) + .build(); + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(clusterSettings); + String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst(); + internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_AUTO_FORCE_MERGES_ENABLED.getKey(), false) + .build(); + assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get()); + for (int i = 0; i < INGESTION_COUNT; i++) { + indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_1); + } + IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1); + assertNotNull(shard); + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); + Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3); + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount()); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); + } + + public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception { + Settings clusterSettings = Settings.builder() + .put(super.nodeSettings(0)) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true) + .build(); + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(clusterSettings); + String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst(); + internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get()); + for (int i = 0; i < INGESTION_COUNT; i++) { + indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_1); + } + IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1); + assertNotNull(shard); + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); + waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + // assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount()); + // assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount()); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); + } + + public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws Exception { + + Settings clusterSettings = Settings.builder() + .put(super.nodeSettings(0)) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true) + .build(); + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(clusterSettings); + String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst(); + internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst(); + assertAcked( + client().admin() + .indices() + .prepareCreate(INDEX_NAME_1) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .get() + ); + assertAcked( + client().admin() + .indices() + .prepareCreate(INDEX_NAME_2) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .get() + ); + for (int i = 0; i < INGESTION_COUNT; i++) { + indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_1); + } + IndexShard shard1 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 0); + IndexShard shard2 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 1); + IndexShard shard3 = getIndexShardFromShardId(dataNode, INDEX_NAME_1, 2); + assertNotNull(shard1); + assertNotNull(shard2); + assertNotNull(shard3); + for (int i = 0; i < INGESTION_COUNT; i++) { + indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME_2); + } + IndexShard shard4 = getIndexShardFromShardId(dataNode, INDEX_NAME_2, 0); + IndexShard shard5 = getIndexShardFromShardId(dataNode, INDEX_NAME_2, 1); + assertNotNull(shard4); + assertNotNull(shard5); + + SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false); + SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false); + SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false); + SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false); + SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false); + AtomicLong totalSegments = new AtomicLong( + segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount() + + segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount() + ); + assertTrue(totalSegments.get() > 5); + waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard4.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard5.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false); + SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false); + SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false); + SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false); + SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false); + totalSegments.set( + segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount() + + segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount() + ); + // assertEquals(5, totalSegments.get()); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get()); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index cdd86512e49b9..35e2b9f4ebe4b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -116,6 +116,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.index.autoforcemerge.ForceMergeManagerSettings; import org.opensearch.index.compositeindex.CompositeIndexSettings; import org.opensearch.index.remote.RemoteStorePressureSettings; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; @@ -830,7 +831,18 @@ public void apply(Settings value, Settings current, Settings previous) { // Setting related to refresh optimisations IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING, - IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING + IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING, + + // Settings related to Auto Force Merge Manager + ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING, + ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL, + ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 0f26e7bc68422..0ecd24cc8c09a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -278,6 +278,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for search replica IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, + // Settings for Auto Force Merge + IndexSettings.INDEX_AUTO_FORCE_MERGES_ENABLED, // Setting for derived source feature IndexSettings.INDEX_DERIVED_SOURCE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index b53318d491c16..6d79b43fe119e 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -783,6 +783,13 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final Setting INDEX_AUTO_FORCE_MERGES_ENABLED = Setting.boolSetting( + "index.auto_force_merge.enabled", + true, + Property.Dynamic, + Property.IndexScope + ); + public static final Setting INDEX_DERIVED_SOURCE_SETTING = Setting.boolSetting( "index.derived_source.enabled", false, @@ -804,6 +811,7 @@ public static IndexMergePolicy fromString(String text) { private volatile String remoteStoreTranslogRepository; private volatile String remoteStoreRepository; private int remoteTranslogKeepExtraGen; + private boolean autoForcemergeEnabled; private Version extendedCompatibilitySnapshotVersion; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock @@ -979,7 +987,7 @@ public boolean isDerivedFieldAllowed() { * while index level settings will overwrite node settings. * * @param indexMetadata the index metadata this settings object is associated with - * @param nodeSettings the nodes settings this index is allocated on. + * @param nodeSettings the nodes settings this index is allocated on. */ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSettings) { this(indexMetadata, nodeSettings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); @@ -990,7 +998,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * while index level settings will overwrite node settings. * * @param indexMetadata the index metadata this settings object is associated with - * @param nodeSettings the nodes settings this index is allocated on. + * @param nodeSettings the nodes settings this index is allocated on. */ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) { scopedSettings = indexScopedSettings.copy(nodeSettings, indexMetadata); @@ -1195,6 +1203,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this::setRemoteTranslogUploadBufferInterval ); scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); + this.autoForcemergeEnabled = scopedSettings.get(INDEX_AUTO_FORCE_MERGES_ENABLED); + scopedSettings.addSettingsUpdateConsumer(INDEX_AUTO_FORCE_MERGES_ENABLED, this::setAutoForcemergeEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, this::setEnableFuzzySetForDocId); scopedSettings.addSettingsUpdateConsumer( INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, @@ -1325,6 +1335,7 @@ public String customDataPath() { /** * Returns the version the index was created on. + * * @see IndexMetadata#indexCreated(Settings) */ public Version getIndexVersionCreated() { @@ -1365,7 +1376,7 @@ public boolean isCompositeIndex() { /** * Returns true if segment replication is enabled on the index. - * + *

* Every shard on a remote node would also have SegRep enabled even without * proper index setting during the migration. */ @@ -1528,6 +1539,7 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) { /** * Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting * {@code index.translog.durability} is set as {@code request}. + * * @return the buffer interval. */ public TimeValue getRemoteTranslogUploadBufferInterval() { @@ -1553,6 +1565,14 @@ public void setRemoteTranslogKeepExtraGen(int extraGen) { this.remoteTranslogKeepExtraGen = extraGen; } + public void setAutoForcemergeEnabled(boolean autoForcemergeEnabled) { + this.autoForcemergeEnabled = autoForcemergeEnabled; + } + + public boolean isAutoForcemergeEnabled() { + return this.autoForcemergeEnabled; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ @@ -1647,6 +1667,7 @@ private void setMaxInnerResultWindow(int maxInnerResultWindow) { /** * Returns the max number of filters in adjacency_matrix aggregation search requests + * * @deprecated This setting will be removed in 8.0 */ @Deprecated @@ -1719,7 +1740,7 @@ private void setMaxShingleDiff(int maxShingleDiff) { } /** - * Returns the maximum number of chars that will be analyzed in a highlight request + * Returns the maximum number of chars that will be analyzed in a highlight request */ public int getHighlightMaxAnalyzedOffset() { return this.maxAnalyzedOffset; @@ -1730,7 +1751,7 @@ private void setHighlightMaxAnalyzedOffset(int maxAnalyzedOffset) { } /** - * Returns the maximum number of terms that can be used in a Terms Query request + * Returns the maximum number of terms that can be used in a Terms Query request */ public int getMaxTermsCount() { return this.maxTermsCount; @@ -1771,6 +1792,7 @@ public long getGcDeletesInMillis() { /** * Returns the merge policy that should be used for this index. + * * @param isTimeSeriesIndex true if index contains @timestamp field */ public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) { @@ -1931,7 +1953,6 @@ private void setSearchThrottled(boolean searchThrottled) { /** * Returns true if unreferenced files should be cleaned up on merge failure for this index. - * */ public boolean shouldCleanupUnreferencedFiles() { return shouldCleanupUnreferencedFiles; @@ -2034,6 +2055,7 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) { /** * Returns true if we need to maintain backward compatibility for index sorted indices created prior to version 2.7 + * * @return boolean */ public boolean shouldWidenIndexSortType() { diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java b/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java new file mode 100644 index 0000000000000..f79b0e72c683a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java @@ -0,0 +1,477 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.translog.TranslogStats; +import org.opensearch.indices.IndicesService; +import org.opensearch.monitor.MonitorService; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.os.OsService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +/** + * AutoForceMergeManager : Manages automatic force merge operations for indices in OpenSearch. This component monitors and + * triggers force merge on primary shards based on their translog age and system conditions. It ensures + * optimal segment counts while respecting node resources and health constraints. Force merge operations + * are executed with configurable delays to prevent system overload. + * + * @opensearch.internal + */ +public class AutoForceMergeManager extends AbstractLifecycleComponent { + + private final ThreadPool threadPool; + private final OsService osService; + private final FsService fsService; + private final JvmService jvmService; + private final IndicesService indicesService; + private final ClusterService clusterService; + private final AsyncForceMergeTask task; + private ConfigurationValidator configurationValidator; + private NodeValidator nodeValidator; + private ShardValidator shardValidator; + private final ForceMergeManagerSettings forceMergeManagerSettings; + private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog); + private final Set mergingShards; + private Integer allocatedProcessors; + + private static final Logger logger = LogManager.getLogger(AutoForceMergeManager.class); + + public AutoForceMergeManager( + ThreadPool threadPool, + MonitorService monitorService, + IndicesService indicesService, + ClusterService clusterService + ) { + this.threadPool = threadPool; + this.osService = monitorService.osService(); + this.fsService = monitorService.fsService(); + this.jvmService = monitorService.jvmService(); + this.clusterService = clusterService; + this.indicesService = indicesService; + this.forceMergeManagerSettings = new ForceMergeManagerSettings(clusterService, this::modifySchedulerInterval); + this.task = new AsyncForceMergeTask(); + this.mergingShards = new HashSet<>(); + } + + @Override + protected void doStart() { + this.configurationValidator = new ConfigurationValidator(); + this.nodeValidator = new NodeValidator(); + this.shardValidator = new ShardValidator(); + this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings()); + } + + @Override + protected void doStop() { + if (task != null) { + this.task.close(); + } + } + + @Override + protected void doClose() { + if (task != null) { + this.task.close(); + } + } + + private void modifySchedulerInterval(TimeValue schedulerInterval) { + this.task.setInterval(schedulerInterval); + } + + private void triggerForceMerge() { + if (configurationValidator.hasWarmNodes() == false) { + logger.debug("No warm nodes found. Skipping Auto Force merge."); + return; + } + if (nodeValidator.validate().isAllowed() == false) { + logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge"); + return; + } + int iteration = nodeValidator.getMaxConcurrentForceMerges(); + for (IndexShard shard : getShardsBasedOnSorting(indicesService)) { + if (iteration == 0) { + break; + } + if (nodeValidator.validate().isAllowed() == false) { + logger.debug("Node conditions no longer suitable for force merge."); + break; + } + iteration--; + CompletableFuture.runAsync(() -> { + try { + mergingShards.add(shard.shardId().getId()); + shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount())); + logger.debug("Merging is completed successfully for the shard {}", shard.shardId()); + } catch (Exception e) { + logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e); + } finally { + mergingShards.remove(shard.shardId().getId()); + } + }, threadPool.executor(ThreadPool.Names.FORCE_MERGE)); + logger.info("Successfully triggered force merge for shard {}", shard.shardId()); + try { + Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Timer was interrupted while waiting between shards", e); + break; + } + } + } + + private List getShardsBasedOnSorting(Iterable indicesService) { + Map shardAgeCache = new HashMap<>(); + + return StreamSupport.stream(indicesService.spliterator(), false) + .flatMap(indexService -> StreamSupport.stream(indexService.spliterator(), false)) + .filter(shard -> !shard.shardId().getIndexName().startsWith(".")) + .filter(shard -> shard.routingEntry().primary()) + .filter(shard -> !mergingShards.contains(shard.shardId().getId())) + .filter(shard -> shardValidator.validate(shard).isAllowed()) + .peek(shard -> shardAgeCache.computeIfAbsent(shard, this::getEarliestLastModifiedAge)) + .sorted(new ShardAgeComparator(shardAgeCache)) + .limit(getNodeValidator().getMaxConcurrentForceMerges()) + .collect(Collectors.toList()); + } + + private static class ShardAgeComparator implements Comparator { + private final Map shardAgeCache; + + public ShardAgeComparator(Map shardAgeCache) { + this.shardAgeCache = shardAgeCache; + } + + @Override + public int compare(IndexShard s1, IndexShard s2) { + long age1 = shardAgeCache.get(s1); + long age2 = shardAgeCache.get(s2); + return Long.compare(age1, age2); + } + } + + private long getEarliestLastModifiedAge(IndexShard shard) { + CommonStats stats = new CommonStats(indicesService.getIndicesQueryCache(), shard, flags); + return stats.getTranslog() != null ? stats.getTranslog().getEarliestLastModifiedAge() : 0; + } + + /** + * Validates the node configuration requirements for auto force merge operations. + * This validator ensures that the node meets two primary criteria: + * 1. It must be a dedicated data node (hot node) + * 2. Remote store must be enabled + * The validation is performed once and cached for subsequent checks to improve performance. + */ + protected class ConfigurationValidator implements ValidationStrategy { + + private final boolean isOnlyDataNode; + private boolean isRemoteStoreEnabled = false; + private boolean hasWarmNodes = false; + + ConfigurationValidator() { + DiscoveryNode localNode = clusterService.localNode(); + isOnlyDataNode = localNode.isDataNode() && !localNode.isWarmNode(); + isRemoteStoreEnabled = isRemoteStorageEnabled(); + } + + /** + * Validates the node configuration against required criteria. + * This method first ensures initialization is complete, then checks if the node + * is a dedicated data node with remote store enabled. + * + * @return ValidationResult with true if all configuration requirements are met, + * ValidationResult(false) otherwise. If validation fails, the associated task is closed. + */ + @Override + public ValidationResult validate() { + if (forceMergeManagerSettings.isAutoForceMergeFeatureEnabled() == false) { + logger.debug("Cluster configuration shows auto force merge feature is disabled. Closing task."); + return new ValidationResult(false); + } + if (isRemoteStoreEnabled == false) { + logger.debug("Cluster configuration is not meeting the criteria. Closing task."); + task.close(); + return new ValidationResult(false); + } + if (isOnlyDataNode == false) { + logger.debug("Node configuration doesn't meet requirements. Closing task."); + task.close(); + return new ValidationResult(false); + } + return new ValidationResult(true); + } + + /** + * Checks if remote storage is enabled in the cluster settings. + */ + private boolean isRemoteStorageEnabled() { + return clusterService.getSettings().getAsBoolean(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false); + } + + /** + * Checks if cluster has warm nodes. + */ + private boolean hasWarmNodes() { + if (hasWarmNodes == true) return true; + ClusterState clusterState = clusterService.state(); + return hasWarmNodes = clusterState.getNodes().getNodes().values().stream().anyMatch(DiscoveryNode::isWarmNode); + } + } + + /** + * Validates node-level conditions for force merge operations. + * This validator checks CPU usage, JVM memory usage, and force merge thread availability + * to determine if force merge operations can proceed safely. + */ + protected class NodeValidator implements ValidationStrategy { + + @Override + public ValidationResult validate() { + if (isCpuUsageOverThreshold()) { + return new ValidationResult(false); + } + if (isDiskUsageOverThreshold()) { + return new ValidationResult(false); + } + double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent(); + if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) { + logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold()); + return new ValidationResult(false); + } + if (areForceMergeThreadsAvailable() == false) { + logger.debug("No force merge threads available"); + return new ValidationResult(false); + } + return new ValidationResult(true); + } + + private boolean areForceMergeThreadsAvailable() { + for (ThreadPoolStats.Stats stats : threadPool.stats()) { + if (stats.getName().equals(ThreadPool.Names.FORCE_MERGE)) { + return stats.getQueue() == 0; + } + } + return false; + } + + private boolean isCpuUsageOverThreshold() { + double[] loadAverage = osService.stats().getCpu().getLoadAverage(); + double loadAverage5m = (loadAverage[1] / (double) allocatedProcessors) * 100; + if (loadAverage5m >= forceMergeManagerSettings.getCpuThreshold()) { + logger.debug( + "Load Average: 5m({}%) breached the threshold: {}", + loadAverage5m, + forceMergeManagerSettings.getCpuThreshold() + ); + return true; + } + double loadAverage1m = (loadAverage[0] / (double) allocatedProcessors) * 100; + if (loadAverage1m >= forceMergeManagerSettings.getCpuThreshold()) { + logger.debug( + "Load Average: 1m({}%) breached the threshold: {}", + loadAverage1m, + forceMergeManagerSettings.getCpuThreshold() + ); + return true; + } + double cpuPercent = osService.stats().getCpu().getPercent(); + if (cpuPercent >= forceMergeManagerSettings.getCpuThreshold()) { + logger.debug("CPU usage: {} breached the threshold: {}", cpuPercent, forceMergeManagerSettings.getCpuThreshold()); + return true; + } + return false; + } + + private boolean isDiskUsageOverThreshold() { + long total = fsService.stats().getTotal().getTotal().getBytes(); + long available = fsService.stats().getTotal().getAvailable().getBytes(); + double diskPercent = ((double) (total - available) / total) * 100; + if (diskPercent >= forceMergeManagerSettings.getDiskThreshold()) { + logger.debug("Disk usage: {}% breached the threshold: {}", diskPercent, forceMergeManagerSettings.getDiskThreshold()); + return true; + } + return false; + } + + public Integer getMaxConcurrentForceMerges() { + return Math.max(1, (allocatedProcessors / 8)) * forceMergeManagerSettings.getConcurrencyMultiplier(); + } + } + + /** + * Validates shard-level conditions for force merge operations. + * This validator checks segment count and translog age to determine + * if a specific shard is eligible for force merge. + */ + protected class ShardValidator implements ValidationStrategy { + + @Override + public ValidationResult validate(IndexShard shard) { + if (shard.state() != IndexShardState.STARTED) { + logger.debug("Shard({}) skipped: Shard is not in started state.", shard.shardId()); + return new ValidationResult(false); + } + if (isIndexAutoForceMergeEnabled(shard) == false) { + logger.debug("Shard({}) skipped: Shard doesn't belong to a warm candidate index", shard.shardId()); + return new ValidationResult(false); + } + CommonStats stats = new CommonStats(indicesService.getIndicesQueryCache(), shard, flags); + TranslogStats translogStats = stats.getTranslog(); + if (translogStats != null + && translogStats.getEarliestLastModifiedAge() < forceMergeManagerSettings.getTranslogAge().getMillis()) { + logger.debug( + "Shard({}) skipped: Translog is too recent. Age({}ms)", + shard.shardId(), + translogStats.getEarliestLastModifiedAge() + ); + return new ValidationResult(false); + } + SegmentsStats segmentsStats = stats.getSegments(); + if (segmentsStats != null && segmentsStats.getCount() <= forceMergeManagerSettings.getSegmentCount()) { + logger.debug( + "Shard({}) skipped: Shard has {} segments, not exceeding threshold of {}", + shard.shardId(), + segmentsStats.getCount(), + forceMergeManagerSettings.getSegmentCount() + ); + return new ValidationResult(false); + } + return new ValidationResult(true); + } + + private boolean isIndexAutoForceMergeEnabled(IndexShard shard) { + IndexSettings indexSettings = shard.indexSettings(); + return indexSettings.isAutoForcemergeEnabled(); + } + } + + /** + * Strategy interface for implementing different validation approaches + * in the force merge process. Implementations can validate different aspects + * such as node conditions, shard conditions, or custom criteria. + */ + public interface ValidationStrategy { + default ValidationResult validate() { + return new ValidationResult(false); + } + + default ValidationResult validate(IndexShard shard) { + return new ValidationResult(false); + } + } + + /** + * Represents the result of a validation operation. + * This class is immutable and thread-safe. + */ + public static final class ValidationResult { + private final boolean allowed; + + public ValidationResult(boolean allowed) { + this.allowed = allowed; + } + + public boolean isAllowed() { + return allowed; + } + } + + /** + * Asynchronous task that manages force merge operations. + * This task runs periodically to check conditions and trigger force merge + * operations when appropriate. + */ + protected final class AsyncForceMergeTask extends AbstractAsyncTask { + + /** + * Constructs a new AsyncForceMergeTask and initializes its schedule. + */ + public AsyncForceMergeTask() { + super(logger, threadPool, forceMergeManagerSettings.getSchedulerInterval(), true); + rescheduleIfNecessary(); + } + + /** + * Determines if the task should be rescheduled after completion. + * + * @return true to indicate that the task should always be rescheduled + */ + @Override + protected boolean mustReschedule() { + return true; + } + + /** + * Executes the force merge task's core logic. + * Validates configuration and triggers force merge if conditions are met. + */ + @Override + protected void runInternal() { + if (configurationValidator.validate().isAllowed() == false) { + return; + } + triggerForceMerge(); + } + + /** + * Specifies which thread pool should be used for this task. + */ + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + } + + protected AsyncForceMergeTask getTask() { + return task; + } + + protected ConfigurationValidator getConfigurationValidator() { + return configurationValidator; + } + + protected NodeValidator getNodeValidator() { + return nodeValidator; + } + + protected ShardValidator getShardValidator() { + return shardValidator; + } +} diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java b/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java new file mode 100644 index 0000000000000..b1d9ccc77988c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java @@ -0,0 +1,242 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.util.function.Consumer; + +/** + * Settings class that manages configuration parameters for the Auto Force Merge functionality. + * This class handles settings related to force merge operations, including thresholds, + * timing, and operational parameters for the Force Merge Manager. + */ +public class ForceMergeManagerSettings { + + private Integer segmentCount; + private TimeValue forcemergeDelay; + private TimeValue schedulerInterval; + private TimeValue translogAge; + private Double cpuThreshold; + private Double diskThreshold; + private Double jvmThreshold; + private Integer concurrencyMultiplier; + private Boolean autoForceMergeFeatureEnabled; + private final Consumer modifySchedulerInterval; + + /** + * Setting to enable Auto Force Merge (default: false) + */ + public static final Setting AUTO_FORCE_MERGE_SETTING = Setting.boolSetting( + "cluster.auto_force_merge.enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for segment count threshold that triggers force merge (default: 1). + */ + public static final Setting SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE = Setting.intSetting( + "node.auto_force_merge.segment.count", + 1, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for wait time between force merge operations (default: 10s). + */ + public static final Setting MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE = Setting.timeSetting( + "node.auto_force_merge.merge_delay", + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(1), + TimeValue.timeValueSeconds(60), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for scheduler interval (default: 30 minutes). + */ + public static final Setting AUTO_FORCE_MERGE_SCHEDULER_INTERVAL = Setting.timeSetting( + "node.auto_force_merge.scheduler.interval", + TimeValue.timeValueMinutes(30), + TimeValue.timeValueSeconds(1), + TimeValue.timeValueHours(24), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for scheduler interval (default: 30 minutes). + */ + public static final Setting TRANSLOG_AGE_AUTO_FORCE_MERGE = Setting.timeSetting( + "node.auto_force_merge.translog.age", + TimeValue.timeValueMinutes(30), + TimeValue.timeValueSeconds(1), + TimeValue.timeValueHours(24), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for cpu threshold. (default: 80) + */ + public static final Setting CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting( + "node.auto_force_merge.cpu.threshold", + 80.0, + 10, + 100, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for memory threshold. (default: 90) + */ + public static final Setting DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting( + "node.auto_force_merge.disk.threshold", + 90.0, + 10, + 100, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for jvm threshold. (default: 75) + */ + public static final Setting JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting( + "node.auto_force_merge.jvm.threshold", + 75.0, + 10, + 100, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for thread pool multiplier to determine concurrent force merge operations (default: 2). + */ + public static final Setting CONCURRENCY_MULTIPLIER = Setting.intSetting( + "node.auto_force_merge.threads.concurrency_multiplier", + 2, + 2, + 5, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Creates settings manager with cluster settings for dynamic updates. + */ + public ForceMergeManagerSettings(ClusterService clusterService, Consumer modifySchedulerInterval) { + Settings settings = clusterService.getSettings(); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.modifySchedulerInterval = modifySchedulerInterval; + this.autoForceMergeFeatureEnabled = AUTO_FORCE_MERGE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(AUTO_FORCE_MERGE_SETTING, this::setAutoForceMergeFeatureEnabled); + this.schedulerInterval = AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.get(settings); + clusterSettings.addSettingsUpdateConsumer(AUTO_FORCE_MERGE_SCHEDULER_INTERVAL, this::setSchedulerInterval); + this.forcemergeDelay = MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE, this::setForcemergeDelay); + this.translogAge = TRANSLOG_AGE_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(TRANSLOG_AGE_AUTO_FORCE_MERGE, this::setTranslogAge); + this.segmentCount = SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE, this::setSegmentCount); + this.cpuThreshold = CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, this::setCpuThreshold); + this.diskThreshold = DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, this::setDiskThreshold); + this.jvmThreshold = JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.get(settings); + clusterSettings.addSettingsUpdateConsumer(JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, this::setJvmThreshold); + this.concurrencyMultiplier = CONCURRENCY_MULTIPLIER.get(settings); + clusterSettings.addSettingsUpdateConsumer(CONCURRENCY_MULTIPLIER, this::setConcurrencyMultiplier); + } + + public void setAutoForceMergeFeatureEnabled(Boolean autoForceMergeFeatureEnabled) { + this.autoForceMergeFeatureEnabled = autoForceMergeFeatureEnabled; + } + + public Boolean isAutoForceMergeFeatureEnabled() { + return this.autoForceMergeFeatureEnabled; + } + + public void setSegmentCount(Integer segmentCount) { + this.segmentCount = segmentCount; + } + + public Integer getSegmentCount() { + return this.segmentCount; + } + + public TimeValue getTranslogAge() { + return this.translogAge; + } + + public void setTranslogAge(TimeValue translogAge) { + this.translogAge = translogAge; + } + + public void setForcemergeDelay(TimeValue forcemergeDelay) { + this.forcemergeDelay = forcemergeDelay; + } + + public TimeValue getForcemergeDelay() { + return this.forcemergeDelay; + } + + public void setSchedulerInterval(TimeValue schedulerInterval) { + this.schedulerInterval = schedulerInterval; + this.modifySchedulerInterval.accept(schedulerInterval); + } + + public TimeValue getSchedulerInterval() { + return this.schedulerInterval; + } + + public void setCpuThreshold(Double cpuThreshold) { + this.cpuThreshold = cpuThreshold; + } + + public Double getCpuThreshold() { + return this.cpuThreshold; + } + + public void setDiskThreshold(Double diskThreshold) { + this.diskThreshold = diskThreshold; + } + + public Double getDiskThreshold() { + return this.diskThreshold; + } + + public void setJvmThreshold(Double jvmThreshold) { + this.jvmThreshold = jvmThreshold; + } + + public Double getJvmThreshold() { + return this.jvmThreshold; + } + + public void setConcurrencyMultiplier(Integer concurrencyMultiplier) { + this.concurrencyMultiplier = concurrencyMultiplier; + } + + public Integer getConcurrencyMultiplier() { + return this.concurrencyMultiplier; + } + +} diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/package-info.java b/server/src/main/java/org/opensearch/index/autoforcemerge/package-info.java new file mode 100644 index 0000000000000..836eef93a5c5b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base Auto Force Merge Manager Service package for tracking triggering automatic force merges. */ +package org.opensearch.index.autoforcemerge; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 733fa68438165..95424cc9f32f1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -152,6 +152,7 @@ import org.opensearch.index.IngestionConsumerFactory; import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.autoforcemerge.AutoForceMergeManager; import org.opensearch.index.compositeindex.CompositeIndexSettings; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.MergedSegmentWarmerFactory; @@ -370,7 +371,6 @@ public class Node implements Closeable { * Note that this does not control whether the node stores actual indices (see * {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING} * and {@link #NODE_MASTER_SETTING} must also be false. - * */ public static final Setting NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting( "node.local_storage", @@ -452,7 +452,7 @@ public static class DiscoverySettings { private final LocalNodeFactory localNodeFactory; private final NodeService nodeService; private final Tracer tracer; - + private final AutoForceMergeManager autoForceMergeManager; private final MetricsRegistry metricsRegistry; final NamedWriteableRegistry namedWriteableRegistry; private final AtomicReference runnableTaskListener; @@ -1172,6 +1172,8 @@ protected Node(final Environment initialEnvironment, Collection clas workloadGroupService ); + this.autoForceMergeManager = new AutoForceMergeManager(threadPool, monitorService, indicesService, clusterService); + final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) .stream() .map(p -> p.getSecureSettingFactory(settings)) @@ -1585,6 +1587,7 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(SegmentReplicator.class).toInstance(segmentReplicator); b.bind(MergedSegmentWarmerFactory.class).toInstance(mergedSegmentWarmerFactory); b.bind(MappingTransformerRegistry.class).toInstance(mappingTransformerRegistry); + b.bind(AutoForceMergeManager.class).toInstance(autoForceMergeManager); taskManagerClientOptional.ifPresent(value -> b.bind(TaskManagerClient.class).toInstance(value)); }); @@ -1787,6 +1790,7 @@ public Node start() throws NodeValidationException { // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); + this.autoForceMergeManager.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); @@ -1881,7 +1885,7 @@ private Node stop() { injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); - + autoForceMergeManager.stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. @@ -1981,7 +1985,7 @@ public synchronized void close() throws IOException { if (logger.isTraceEnabled()) { toClose.add(() -> logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint())); } - + autoForceMergeManager.stop(); IOUtils.close(toClose); logger.info("closed"); } diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java new file mode 100644 index 0000000000000..5a8d7fc16ad3e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java @@ -0,0 +1,597 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.set.Sets; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.translog.TranslogStats; +import org.opensearch.indices.IndicesService; +import org.opensearch.monitor.MonitorService; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.os.OsService; +import org.opensearch.monitor.os.OsStats; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.index.IndexSettingsTests.newIndexMeta; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AutoForceMergeManagerTests extends OpenSearchTestCase { + + private ClusterService clusterService; + private IndicesService indicesService; + private MonitorService monitorService; + private OsService osService; + private FsService fsService; + private JvmService jvmService; + private ThreadPool threadPool; + private OsStats.Cpu cpu; + private FsInfo.Path disk; + private JvmStats.Mem jvm; + + private final String DATA_NODE_1 = "DATA_NODE_1"; + private final String DATA_NODE_2 = "DATA_NODE_2"; + private final String WARM_NODE_1 = "WARM_NODE_1"; + private final String WARM_NODE_2 = "WARM_NODE_2"; + private final String TEST_INDEX_1 = "TEST_INDEX_1"; + private final String TEST_INDEX_2 = "TEST_INDEX_2"; + + private final String SCHEDULER_INTERVAL = "1s"; + private final String TRANSLOG_AGE = "1s"; + private final String MERGE_DELAY = "1s"; + private Integer allocatedProcessors; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = mock(ThreadPool.class); + clusterService = mock(ClusterService.class); + indicesService = mock(IndicesService.class); + monitorService = mock(MonitorService.class); + osService = mock(OsService.class); + fsService = mock(FsService.class); + jvmService = mock(JvmService.class); + + when(monitorService.osService()).thenReturn(osService); + when(monitorService.fsService()).thenReturn(fsService); + when(monitorService.jvmService()).thenReturn(jvmService); + + OsStats osStats = mock(OsStats.class); + cpu = mock(OsStats.Cpu.class); + when(osService.stats()).thenReturn(osStats); + when(osStats.getCpu()).thenReturn(cpu); + + FsInfo fsInfo = mock(FsInfo.class); + disk = mock(FsInfo.Path.class); + when(fsService.stats()).thenReturn(fsInfo); + when(fsInfo.getTotal()).thenReturn(disk); + when(disk.getTotal()).thenReturn(new ByteSizeValue(100)); + when(disk.getAvailable()).thenReturn(new ByteSizeValue(50)); + + JvmStats jvmStats = mock(JvmStats.class); + jvm = mock(JvmStats.Mem.class); + when(jvmService.stats()).thenReturn(jvmStats); + when(jvmStats.getMem()).thenReturn(jvm); + + allocatedProcessors = OpenSearchExecutors.allocatedProcessors(Settings.EMPTY); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + // ConfigurationValidator Tests + public void testConfigurationValidatorWithFeatureDisabled() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(false, false, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getConfigurationValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testConfigurationValidatorWithDataNodeAndNonRemoteStore() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, false, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + autoForceMergeManager.getTask().runInternal(); + assertFalse(autoForceMergeManager.getConfigurationValidator().validate().isAllowed()); + } + + public void testConfigurationValidatorWithDataNodeAndRemoteStore() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + assertTrue(autoForceMergeManager.getConfigurationValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testConfigurationValidatorWithNonDataNode() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.WARM_ROLE)) + ); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getConfigurationValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + // NodeValidator Tests + public void testNodeValidatorWithHealthyResources() { + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + when(jvm.getHeapUsedPercent()).thenReturn((short) 60); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats( + ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0 + )) + ); + when(threadPool.stats()).thenReturn(stats); + + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); + autoForceMergeManager.start(); + assertTrue(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testNodeValidatorWithHighCPU() { + when(cpu.getPercent()).thenReturn((short) 95); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode warmNode1 = getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.WARM_ROLE)); + ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes( + DiscoveryNodes.builder() + .add(dataNode1) + .add(warmNode1) + .localNodeId(dataNode1.getId()) + .clusterManagerNodeId(dataNode1.getId()) + ) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + when(clusterService.state()).thenReturn(clusterState); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), dataNode1); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.9 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.9 * allocatedProcessors, 0.5 * allocatedProcessors}); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testNodeValidatorWithHighDiskUsage() { + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + when(disk.getAvailable()).thenReturn(new ByteSizeValue(5)); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testNodeValidatorWithHighJVMUsage() { + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + when(jvm.getHeapUsedPercent()).thenReturn((short) 90); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testNodeValidatorWithInsufficientForceMergeThreads() { + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + when(jvm.getHeapUsedPercent()).thenReturn((short) 50); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats( + ThreadPool.Names.FORCE_MERGE, 1, 1, 1, 0, 1, 0, -1 + )) + ); + when(threadPool.stats()).thenReturn(stats); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + ThreadPoolStats emptyStats = new ThreadPoolStats(Collections.emptyList()); + when(threadPool.stats()).thenReturn(emptyStats); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + // ShardValidator Tests + public void testShardValidatorWithValidShard() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(6).getMillis()); + IndexShard shard = getShard(TEST_INDEX_1, translogStats, 2); + assertTrue(autoForceMergeManager.getShardValidator().validate(shard).isAllowed()); + autoForceMergeManager.close(); + } + + public void testShardValidatorWithShardNotInStartedState() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(6).getMillis()); + IndexShard shard = getShard(TEST_INDEX_1, translogStats, 2); + when(shard.state()).thenReturn(IndexShardState.RECOVERING); + assertFalse(autoForceMergeManager.getShardValidator().validate(shard).isAllowed()); + autoForceMergeManager.close(); + } + + public void testShardValidatorWithForbiddenAutoForceMergesSetting() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + IndexShard shard = getShard(TEST_INDEX_1, mock(TranslogStats.class), 1); + IndexSettings indexSettings = getNewIndexSettings(TEST_INDEX_1); + indexSettings.setAutoForcemergeEnabled(false); + when(shard.indexSettings()).thenReturn(indexSettings); + assertFalse(autoForceMergeManager.getShardValidator().validate(shard).isAllowed()); + autoForceMergeManager.close(); + } + + public void testShardValidatorWithLowSegmentCount() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(5).getMillis()); + IndexShard shard = getShard(TEST_INDEX_1, translogStats, 1); + assertFalse(autoForceMergeManager.getShardValidator().validate(shard).isAllowed()); + autoForceMergeManager.close(); + } + + public void testShardValidatorWithRecentTranslog() { + Map additionalSettings = new HashMap<>(); + additionalSettings.put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), "2s"); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, additionalSettings), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(1).getMillis()); + IndexShard shard = getShard(TEST_INDEX_1, translogStats, 2); + assertFalse(autoForceMergeManager.getShardValidator().validate(shard).isAllowed()); + autoForceMergeManager.close(); + } + + public void testShardValidatorWithoutShard() { + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)) + ); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getShardValidator().validate().isAllowed()); + autoForceMergeManager.close(); + } + + public void testForceMergeOperationOnWarmDisabledCluster() { + DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode dataNode2 = getNodeWithRoles(DATA_NODE_2, Set.of(DiscoveryNodeRole.DATA_ROLE)); + ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes( + DiscoveryNodes.builder() + .add(dataNode1) + .add(dataNode2) + .localNodeId(dataNode1.getId()) + .clusterManagerNodeId(dataNode1.getId()) + ) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + when(clusterService.state()).thenReturn(clusterState); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + dataNode1 + ); + autoForceMergeManager.start(); + autoForceMergeManager.getTask().runInternal(); + verify(cpu, never()).getPercent(); + autoForceMergeManager.close(); + } + + public void testForceMergeOperationOnDataNodeWithFailingMerges() throws IOException { + DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode dataNode2 = getNodeWithRoles(DATA_NODE_2, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode warmNode1 = getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.WARM_ROLE)); + DiscoveryNode warmNode2 = getNodeWithRoles(WARM_NODE_2, Set.of(DiscoveryNodeRole.WARM_ROLE)); + ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes( + DiscoveryNodes.builder() + .add(dataNode1) + .add(dataNode2) + .add(warmNode1) + .add(warmNode2) + .localNodeId(dataNode1.getId()) + .clusterManagerNodeId(dataNode1.getId()) + ) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + when(clusterService.state()).thenReturn(clusterState); + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn( + new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } + ); + when(jvm.getHeapUsedPercent()).thenReturn((short) 50); + + int forceMergeThreads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); + when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1)) + ); + when(threadPool.stats()).thenReturn(stats); + + IndexService indexService1 = mock(IndexService.class); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(6).getMillis()); + IndexShard shard1 = getShard(TEST_INDEX_1, translogStats, 2); + List indexShards1 = List.of(shard1); + when(indexService1.spliterator()).thenReturn(indexShards1.spliterator()); + List indexServices = List.of(indexService1); + when(indicesService.spliterator()).thenReturn(indexServices.spliterator()); + when(shard1.indexSettings()).thenReturn(getNewIndexSettings(TEST_INDEX_1)); + when(shard1.state()).thenReturn(IndexShardState.STARTED); + + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + dataNode1 + ); + autoForceMergeManager.start(); + doThrow(new IOException("Testing")).when(shard1).forceMerge(any()); + autoForceMergeManager.getTask().runInternal(); + verify(shard1, times(1)).forceMerge(any()); + autoForceMergeManager.close(); + executorService.shutdown(); + } + + public void testForceMergeOperationOnDataNodeOfWarmEnabledCluster() throws IOException { + DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode dataNode2 = getNodeWithRoles(DATA_NODE_2, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode warmNode1 = getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.WARM_ROLE)); + DiscoveryNode warmNode2 = getNodeWithRoles(WARM_NODE_2, Set.of(DiscoveryNodeRole.WARM_ROLE)); + ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes( + DiscoveryNodes.builder() + .add(dataNode1) + .add(dataNode2) + .add(warmNode1) + .add(warmNode2) + .localNodeId(dataNode1.getId()) + .clusterManagerNodeId(dataNode1.getId()) + ) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + when(clusterService.state()).thenReturn(clusterState); + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn( + new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } + ); + when(jvm.getHeapUsedPercent()).thenReturn((short) 50); + int forceMergeThreads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); + when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1)) + ); + when(threadPool.stats()).thenReturn(stats); + IndexService indexService1 = mock(IndexService.class); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(1).getMillis()); + IndexShard shard1 = getShard(TEST_INDEX_1, translogStats, 2); + List indexShards1 = List.of(shard1); + when(indexService1.spliterator()).thenReturn(indexShards1.spliterator()); + IndexService indexService2 = mock(IndexService.class); + IndexShard shard2 = getShard(TEST_INDEX_2, translogStats, 2); + List indexShards2 = List.of(shard2); + when(indexService2.spliterator()).thenReturn(indexShards2.spliterator()); + List indexServices = Arrays.asList(indexService1, indexService2); + when(indicesService.spliterator()).thenReturn(indexServices.spliterator()); + when(shard1.indexSettings()).thenReturn(getNewIndexSettings(TEST_INDEX_1)); + when(shard1.state()).thenReturn(IndexShardState.STARTED); + when(shard2.indexSettings()).thenReturn(getNewIndexSettings(TEST_INDEX_2)); + when(shard2.state()).thenReturn(IndexShardState.STARTED); + + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + dataNode1 + ); + autoForceMergeManager.start(); + autoForceMergeManager.getTask().runInternal(); + verify(shard1, atLeastOnce()).forceMerge(any()); + verify(shard2, atLeastOnce()).forceMerge(any()); + autoForceMergeManager.close(); + executorService.shutdown(); + } + + public void testForceMergeOperationOnDataNodeWithThreadInterruption() throws InterruptedException { + DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode dataNode2 = getNodeWithRoles(DATA_NODE_2, Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode warmNode1 = getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.WARM_ROLE)); + DiscoveryNode warmNode2 = getNodeWithRoles(WARM_NODE_2, Set.of(DiscoveryNodeRole.WARM_ROLE)); + ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes( + DiscoveryNodes.builder() + .add(dataNode1) + .add(dataNode2) + .add(warmNode1) + .add(warmNode2) + .localNodeId(dataNode1.getId()) + .clusterManagerNodeId(dataNode1.getId()) + ) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + when(clusterService.state()).thenReturn(clusterState); + when(cpu.getPercent()).thenReturn((short) 50); + when(cpu.getLoadAverage()).thenReturn( + new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } + ); + when(jvm.getHeapUsedPercent()).thenReturn((short) 50); + + int forceMergeThreads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); + when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1)) + ); + when(threadPool.stats()).thenReturn(stats); + + IndexService indexService1 = mock(IndexService.class); + TranslogStats translogStats = new TranslogStats(0, 0, 0, 0, TimeValue.timeValueSeconds(1).getMillis()); + List indexShards1 = List.of(getShard(TEST_INDEX_1, translogStats, 2)); + when(indexService1.spliterator()).thenReturn(indexShards1.spliterator()); + List indexServices = List.of(indexService1); + when(indicesService.spliterator()).thenReturn(indexServices.spliterator()); + + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + dataNode1 + ); + autoForceMergeManager.start(); + Thread testThread = new Thread(() -> autoForceMergeManager.getTask().runInternal()); + testThread.start(); + Thread.sleep(1000L); + testThread.interrupt(); + assertTrue(testThread.isInterrupted()); + autoForceMergeManager.close(); + executorService.shutdown(); + } + + private DiscoveryNode getNodeWithRoles(String name, Set roles) { + return new DiscoveryNode(name, buildNewFakeTransportAddress(), new HashMap<>(), Sets.newHashSet(roles), Version.CURRENT); + } + + private AutoForceMergeManager clusterSetupWithNode(Settings settings, DiscoveryNode node) { + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getSettings()).thenReturn(settings); + when(clusterService.localNode()).thenReturn(node); + + return new AutoForceMergeManager( + threadPool, + monitorService, + indicesService, + clusterService + ); + } + + private IndexShard getShard(String indexName, TranslogStats translogStats, Integer segmentCount) { + IndexShard shard = mock(IndexShard.class); + ShardId shardId1 = new ShardId(indexName, "_na_", 0); + SegmentsStats segmentsStats = new SegmentsStats(); + segmentsStats.add(segmentCount); + ShardRouting shardRouting = mock(ShardRouting.class); + when(shard.shardId()).thenReturn(shardId1); + when(shard.translogStats()).thenReturn(translogStats); + when(shard.segmentStats(false, false)).thenReturn(segmentsStats); + when(shard.routingEntry()).thenReturn(shardRouting); + when(shardRouting.primary()).thenReturn(true); + when(shard.state()).thenReturn(IndexShardState.STARTED); + when(shard.indexSettings()).thenReturn(getNewIndexSettings(TEST_INDEX_1)); + return shard; + } + + private IndexSettings getNewIndexSettings(String indexName) { + return new IndexSettings( + newIndexMeta( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") + .put(IndexSettings.INDEX_AUTO_FORCE_MERGES_ENABLED.getKey(), true) + .build() + ), + Settings.EMPTY + ); + } + + private Settings getConfiguredClusterSettings(Boolean featureEnabled, Boolean remoteEnabled, Map additionalSettings) { + Settings.Builder settingsBuilder = Settings.builder() + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), featureEnabled) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), remoteEnabled) + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), SCHEDULER_INTERVAL) + .put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), TRANSLOG_AGE) + .put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), MERGE_DELAY); + if (additionalSettings != null) { + additionalSettings.forEach((key, value) -> { + if (value != null) { + switch (value) { + case Boolean b -> settingsBuilder.put(key, b); + case Integer i -> settingsBuilder.put(key, i); + case Long l -> settingsBuilder.put(key, l); + case Double v -> settingsBuilder.put(key, v); + case String s -> settingsBuilder.put(key, s); + case TimeValue timeValue -> settingsBuilder.put(key, timeValue); + default -> settingsBuilder.put(key, value.toString()); + } + } + }); + } + return settingsBuilder.build(); + } +} diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java new file mode 100644 index 0000000000000..cb3c008f0de08 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.Set; +import java.util.function.Consumer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ForceMergeManagerSettingsTests extends OpenSearchTestCase { + + private ClusterSettings clusterSettings; + private Settings settings; + private ForceMergeManagerSettings forceMergeManagerSettings; + + @Before + public void setUp() throws Exception { + super.setUp(); + settings = Settings.builder().build(); + clusterSettings = new ClusterSettings( + settings, + Set.of( + ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING, + ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL, + ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, + ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER + ) + ); + Consumer modifySchedulerInterval = new Consumer<>() { + private TimeValue schedulerInterval; + + @Override + public void accept(TimeValue timeValue) { + this.schedulerInterval = timeValue; + } + }; + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.getSettings()).thenReturn(settings); + forceMergeManagerSettings = new ForceMergeManagerSettings(clusterService, modifySchedulerInterval); + } + + public void testDefaultSettings() { + assertEquals(false, forceMergeManagerSettings.isAutoForceMergeFeatureEnabled()); + assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(10)); + assertEquals(forceMergeManagerSettings.getSchedulerInterval(), TimeValue.timeValueMinutes(30)); + assertEquals(2, (int) forceMergeManagerSettings.getConcurrencyMultiplier()); + assertEquals(1, (int) forceMergeManagerSettings.getSegmentCount()); + assertEquals(80.0, forceMergeManagerSettings.getCpuThreshold(), 0.0); + assertEquals(75.0, forceMergeManagerSettings.getJvmThreshold(), 0.0); + } + + public void testDynamicSettingsUpdate() { + Settings newSettings = Settings.builder() + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), false) + .put(ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.getKey(), 30) + .build(); + + clusterSettings.applySettings(newSettings); + + assertEquals(false, forceMergeManagerSettings.isAutoForceMergeFeatureEnabled()); + assertEquals(30, (int) forceMergeManagerSettings.getSegmentCount()); + } + + public void testInvalidSettings() { + expectThrows(IllegalArgumentException.class, () -> { + Settings invalidSettings = Settings.builder() + .put(ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.getKey(), -1) + .build(); + clusterSettings.applySettings(invalidSettings); + }); + expectThrows(IllegalArgumentException.class, () -> { + Settings invalidSettings = Settings.builder() + .put(ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.getKey(), 101.0) + .build(); + clusterSettings.applySettings(invalidSettings); + }); + } + + public void testTimeValueSettings() { + Settings newSettings = Settings.builder() + .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), "10m") + .put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), "10m") + .put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), "15s") + .build(); + + clusterSettings.applySettings(newSettings); + + assertEquals(forceMergeManagerSettings.getSchedulerInterval(), TimeValue.timeValueMinutes(10)); + assertEquals(forceMergeManagerSettings.getTranslogAge(), TimeValue.timeValueMinutes(10)); + assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(15)); + } + + public void testThreadSettings() { + Settings newSettings = Settings.builder().put(ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER.getKey(), 5).build(); + + clusterSettings.applySettings(newSettings); + + assertEquals(5, (int) forceMergeManagerSettings.getConcurrencyMultiplier()); + } + + public void testThresholdSettings() { + Settings newSettings = Settings.builder() + .put(ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.getKey(), 60.0) + .put(ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.getKey(), 70.0) + .put(ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE.getKey(), 70.0) + .build(); + + clusterSettings.applySettings(newSettings); + assertEquals(60.0, forceMergeManagerSettings.getCpuThreshold(), 0.0); + assertEquals(70.0, forceMergeManagerSettings.getDiskThreshold(), 0.0); + assertEquals(70.0, forceMergeManagerSettings.getJvmThreshold(), 0.0); + } + +} diff --git a/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index e8abcbb5f4fee..58ace74154514 100644 --- a/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -337,6 +337,16 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe return indexService.getShard(0); } + protected IndexShard getIndexShardFromShardId(String dataNode, String indexName, Integer shardId) throws ExecutionException, + InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + return indexService.getShard(shardId); + } + protected void restore(boolean restoreAllShards, String... indices) { if (restoreAllShards) { assertAcked(client().admin().indices().prepareClose(indices));