Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix the visit of sub queries for HasParentQuery and HasChildQuery ([#18621](https://github.com/opensearch-project/OpenSearch/pull/18621))
- Fix the backward compatibility regression with COMPLEMENT for Regexp queries introduced in OpenSearch 3.0 ([#18640](https://github.com/opensearch-project/OpenSearch/pull/18640))
- Fix Replication lag computation ([#18602](https://github.com/opensearch-project/OpenSearch/pull/18602))
- Fixed Staggered merge - load average replace with AverageTrackers, some Default thresholds modified ([#18666](https://github.com/opensearch-project/OpenSearch/pull/18666))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public class AutoForceMergeManagerIT extends RemoteStoreBaseIntegTestCase {
private static final String MERGE_DELAY = "1s";
private static final Integer SEGMENT_COUNT = 1;

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Expand Down Expand Up @@ -158,8 +153,8 @@ public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception {
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
// assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
// assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
}

Expand Down Expand Up @@ -221,11 +216,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false);
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false);
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false);
AtomicLong totalSegments = new AtomicLong(
AtomicLong totalSegmentsBefore = new AtomicLong(
segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount()
+ segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount()
);
assertTrue(totalSegments.get() > 5);
assertTrue(totalSegmentsBefore.get() > 5);
waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
Expand All @@ -236,11 +231,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false);
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false);
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false);
totalSegments.set(
AtomicLong totalSegmentsAfter = new AtomicLong(
segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount()
+ segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount()
);
// assertEquals(5, totalSegments.get());
assertTrue(totalSegmentsBefore.get() > totalSegmentsAfter.get());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
private ConfigurationValidator configurationValidator;
private NodeValidator nodeValidator;
private ShardValidator shardValidator;
private Integer allocatedProcessors;
private ResourceTrackerProvider.ResourceTrackers resourceTrackers;
private final ForceMergeManagerSettings forceMergeManagerSettings;
private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog);
private final Set<Integer> mergingShards;
private Integer allocatedProcessors;

private static final Logger logger = LogManager.getLogger(AutoForceMergeManager.class);

Expand All @@ -96,6 +97,7 @@ protected void doStart() {
this.nodeValidator = new NodeValidator();
this.shardValidator = new ShardValidator();
this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings());
this.resourceTrackers = ResourceTrackerProvider.create(threadPool);
}

@Override
Expand All @@ -117,43 +119,65 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) {
}

private void triggerForceMerge() {
if (isValidForForceMerge() == false) {
return;
}
executeForceMergeOnShards();
}

private boolean isValidForForceMerge() {
if (configurationValidator.hasWarmNodes() == false) {
resourceTrackers.stop();
logger.debug("No warm nodes found. Skipping Auto Force merge.");
return;
return false;
}
if (nodeValidator.validate().isAllowed() == false) {
logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge");
return;
return false;
}
int iteration = nodeValidator.getMaxConcurrentForceMerges();
return true;
}

private void executeForceMergeOnShards() {
int remainingIterations = nodeValidator.getMaxConcurrentForceMerges();
for (IndexShard shard : getShardsBasedOnSorting(indicesService)) {
if (iteration == 0) {
if (remainingIterations == 0 || !nodeValidator.validate().isAllowed()) {
if (remainingIterations > 0) {
logger.debug("Node conditions no longer suitable for force merge.");
}
break;
}
if (nodeValidator.validate().isAllowed() == false) {
logger.debug("Node conditions no longer suitable for force merge.");
remainingIterations--;
executeForceMergeForShard(shard);
if (!waitBetweenShards()) {
break;
}
iteration--;
CompletableFuture.runAsync(() -> {
try {
mergingShards.add(shard.shardId().getId());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
} catch (Exception e) {
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
} finally {
mergingShards.remove(shard.shardId().getId());
}
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
logger.info("Successfully triggered force merge for shard {}", shard.shardId());
}
}

private void executeForceMergeForShard(IndexShard shard) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Timer was interrupted while waiting between shards", e);
break;
mergingShards.add(shard.shardId().getId());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
} catch (Exception e) {
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
} finally {
mergingShards.remove(shard.shardId().getId());
}
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
logger.info("Successfully triggered force merge for shard {}", shard.shardId());
}

private boolean waitBetweenShards() {
try {
Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Timer was interrupted while waiting between shards", e);
return false;
}
}

Expand Down Expand Up @@ -264,15 +288,14 @@ protected class NodeValidator implements ValidationStrategy {

@Override
public ValidationResult validate() {
resourceTrackers.start();
if (isCpuUsageOverThreshold()) {
return new ValidationResult(false);
}
if (isDiskUsageOverThreshold()) {
return new ValidationResult(false);
}
double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold());
if (isJvmUsageOverThreshold()) {
return new ValidationResult(false);
}
if (areForceMergeThreadsAvailable() == false) {
Expand All @@ -291,24 +314,34 @@ private boolean areForceMergeThreadsAvailable() {
return false;
}

private boolean isJvmUsageOverThreshold() {
double jvmAverage = resourceTrackers.jvmFiveMinute.getAverage();
if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM Average: 5m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold());
return true;
}
jvmAverage = resourceTrackers.jvmOneMinute.getAverage();
if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM Average: 1m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold());
return true;
}
double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold());
return true;
}
return false;
}

private boolean isCpuUsageOverThreshold() {
double[] loadAverage = osService.stats().getCpu().getLoadAverage();
double loadAverage5m = (loadAverage[1] / (double) allocatedProcessors) * 100;
if (loadAverage5m >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug(
"Load Average: 5m({}%) breached the threshold: {}",
loadAverage5m,
forceMergeManagerSettings.getCpuThreshold()
);
double cpuAverage = resourceTrackers.cpuFiveMinute.getAverage();
if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug("CPU Average: 5m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold());
return true;
}
double loadAverage1m = (loadAverage[0] / (double) allocatedProcessors) * 100;
if (loadAverage1m >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug(
"Load Average: 1m({}%) breached the threshold: {}",
loadAverage1m,
forceMergeManagerSettings.getCpuThreshold()
);
cpuAverage = resourceTrackers.cpuOneMinute.getAverage();
if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug("CPU Average: 1m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold());
return true;
}
double cpuPercent = osService.stats().getCpu().getPercent();
Expand Down Expand Up @@ -445,6 +478,7 @@ protected boolean mustReschedule() {
@Override
protected void runInternal() {
if (configurationValidator.validate().isAllowed() == false) {
resourceTrackers.stop();
return;
}
triggerForceMerge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public class ForceMergeManagerSettings {
);

/**
* Setting for wait time between force merge operations (default: 10s).
* Setting for wait time between force merge operations (default: 15s).
*/
public static final Setting<TimeValue> MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE = Setting.timeSetting(
"node.auto_force_merge.merge_delay",
TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(15),
TimeValue.timeValueSeconds(1),
TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic,
Expand Down Expand Up @@ -92,23 +92,23 @@ public class ForceMergeManagerSettings {
);

/**
* Setting for cpu threshold. (default: 80)
* Setting for cpu threshold. (default: 75)
*/
public static final Setting<Double> CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting(
"node.auto_force_merge.cpu.threshold",
80.0,
75.0,
10,
100,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Setting for memory threshold. (default: 90)
* Setting for disk threshold. (default: 85)
*/
public static final Setting<Double> DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting(
"node.auto_force_merge.disk.threshold",
90.0,
85.0,
10,
100,
Setting.Property.Dynamic,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.autoforcemerge;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.resource.tracker.AverageCpuUsageTracker;
import org.opensearch.node.resource.tracker.AverageMemoryUsageTracker;
import org.opensearch.threadpool.ThreadPool;

/**
* Provider for creating resource usage trackers used in auto force merge operations.
*
* @opensearch.internal
*/
public class ResourceTrackerProvider {

public static final TimeValue AVERAGE_WINDOW_ONE_SECOND = TimeValue.timeValueSeconds(6);
public static final TimeValue AVERAGE_WINDOW_FIVE_SECOND = TimeValue.timeValueSeconds(30);
public static final TimeValue AVERAGE_WINDOW_ONE_MINUTE = TimeValue.timeValueMinutes(1);
public static final TimeValue AVERAGE_WINDOW_FIVE_MINUTE = TimeValue.timeValueMinutes(5);

public static ResourceTrackers resourceTrackers;

public static ResourceTrackers create(ThreadPool threadPool) {
return resourceTrackers = new ResourceTrackers(
new AverageCpuUsageTracker(threadPool, AVERAGE_WINDOW_ONE_SECOND, AVERAGE_WINDOW_ONE_MINUTE),
new AverageCpuUsageTracker(threadPool, AVERAGE_WINDOW_FIVE_SECOND, AVERAGE_WINDOW_FIVE_MINUTE),
new AverageMemoryUsageTracker(threadPool, AVERAGE_WINDOW_ONE_SECOND, AVERAGE_WINDOW_ONE_MINUTE),
new AverageMemoryUsageTracker(threadPool, AVERAGE_WINDOW_FIVE_SECOND, AVERAGE_WINDOW_FIVE_MINUTE)
);
}

/**
* Container for resource usage trackers used in auto force merge operations.
* Provides access to CPU and JVM memory usage trackers with different time windows.
*
* @opensearch.internal
*/
public static class ResourceTrackers {
public final AverageCpuUsageTracker cpuOneMinute;
public final AverageCpuUsageTracker cpuFiveMinute;
public final AverageMemoryUsageTracker jvmOneMinute;
public final AverageMemoryUsageTracker jvmFiveMinute;

/**
* Creates a new ResourceTrackers instance.
*
* @param cpuOneMinute CPU tracker with 1-minute window
* @param cpuFiveMinute CPU tracker with 5-minute window
* @param jvmOneMinute JVM memory tracker with 1-minute window
* @param jvmFiveMinute JVM memory tracker with 5-minute window
*/
ResourceTrackers(
AverageCpuUsageTracker cpuOneMinute,
AverageCpuUsageTracker cpuFiveMinute,
AverageMemoryUsageTracker jvmOneMinute,
AverageMemoryUsageTracker jvmFiveMinute
) {
this.cpuOneMinute = cpuOneMinute;
this.cpuFiveMinute = cpuFiveMinute;
this.jvmOneMinute = jvmOneMinute;
this.jvmFiveMinute = jvmFiveMinute;
}

public void start() {
cpuOneMinute.start();
cpuFiveMinute.start();
jvmOneMinute.start();
jvmFiveMinute.start();
}

public void stop() {
cpuOneMinute.stop();
cpuFiveMinute.stop();
jvmOneMinute.stop();
jvmFiveMinute.stop();
}
}
}
Loading