Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Rule based auto-tagging] Bug fix for update rule api ([#18488](https://github.com/opensearch-project/OpenSearch/pull/18488))
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.DiskUsage;

/**
* Base interface for disk threshold evaluation logic.
* This interface defines methods for evaluating whether a node exceeds
* various watermarks based on disk usage.
*
* @opensearch.internal
*/
public interface DiskThresholdEvaluator {

/**
* Checks if a node is exceeding the low watermark threshold
*
* @param diskUsage disk usage for the node
* @return true if the node is exceeding the low watermark, false otherwise
*/
boolean isNodeExceedingLowWatermark(DiskUsage diskUsage);

/**
* Checks if a node is exceeding the high watermark threshold
*
* @param diskUsage disk usage for the node
* @return true if the node is exceeding the high watermark, false otherwise
*/
boolean isNodeExceedingHighWatermark(DiskUsage diskUsage);

/**
* Checks if a node is exceeding the flood stage watermark threshold
*
* @param diskUsage disk usage for the node
* @return true if the node is exceeding the flood stage watermark, false otherwise
*/
boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage);

/**
* Get the free space low threshold for a given total space
*
* @param totalSpace total available space
* @return free space low threshold in bytes
*/
long getFreeSpaceLowThreshold(long totalSpace);

/**
* Get the free space high threshold for a given total space
*
* @param totalSpace total available space
* @return free space high threshold in bytes
*/
long getFreeSpaceHighThreshold(long totalSpace);

/**
* Get the free space flood stage threshold for a given total space
*
* @param totalSpace total available space
* @return free space flood stage threshold in bytes
*/
long getFreeSpaceFloodStageThreshold(long totalSpace);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.transport.client.Client;

import java.util.ArrayList;
Expand All @@ -68,6 +69,10 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;

/**
* Listens for a node to go over the high watermark and kicks off an empty
* reroute if it does. Also responsible for logging about nodes that have
Expand All @@ -81,8 +86,10 @@ public class DiskThresholdMonitor {
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Supplier<ClusterState> clusterStateSupplier;
private final Supplier<Double> dataToFileCacheSizeRatioSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private final NodeDiskEvaluator nodeDiskEvaluator;
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
private final AtomicBoolean checkInProgress = new AtomicBoolean();

Expand Down Expand Up @@ -110,13 +117,21 @@ public DiskThresholdMonitor(
ClusterSettings clusterSettings,
Client client,
LongSupplier currentTimeMillisSupplier,
RerouteService rerouteService
RerouteService rerouteService,
Supplier<Double> dataToFileCacheSizeRatioSupplier
) {
this.clusterStateSupplier = clusterStateSupplier;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.rerouteService = rerouteService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
DiskThresholdEvaluator hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings);
DiskThresholdEvaluator warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator(
diskThresholdSettings,
dataToFileCacheSizeRatioSupplier
);
this.nodeDiskEvaluator = new NodeDiskEvaluator(hotNodeEvaluator, warmNodeEvaluator);
this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier;
}

private void checkFinished() {
Expand Down Expand Up @@ -162,11 +177,20 @@ public void onNewInfo(ClusterInfo info) {

for (final Map.Entry<String, DiskUsage> entry : usages.entrySet()) {
final String node = entry.getKey();
final DiskUsage usage = entry.getValue();
DiskUsage usage = entry.getValue();
final RoutingNode routingNode = routingNodes.node(node);
if (routingNode == null) {
continue;
}

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()
|| usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
// Only for Dedicated Warm Nodes
final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode));
if (isWarmNode) {
// Create DiskUsage for Warm Nodes based on total Addressable Space
usage = getWarmDiskUsage(usage, info, routingNode, state);
}

if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage, isWarmNode)) {

nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
Expand All @@ -189,8 +213,7 @@ public void onNewInfo(ClusterInfo info) {
continue;
}

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usage, isWarmNode)) {

if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
for (ShardRouting routing : routingNode) {
Expand All @@ -209,8 +232,7 @@ public void onNewInfo(ClusterInfo info) {
Math.max(0L, usage.getFreeBytes() - reservedSpace)
);

if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) {

nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
Expand All @@ -228,61 +250,60 @@ public void onNewInfo(ClusterInfo info) {
);
}

} else if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()
|| usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
} else if (nodeDiskEvaluator.isNodeExceedingLowWatermark(usage, isWarmNode)) {

nodesOverHighThresholdAndRelocating.remove(node);
nodesOverHighThresholdAndRelocating.remove(node);

final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node);
final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node);
assert (wasUnderLowThreshold && wasOverHighThreshold) == false;
final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node);
final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node);
assert (wasUnderLowThreshold && wasOverHighThreshold) == false;

if (wasUnderLowThreshold) {
logger.info(
"low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
diskThresholdSettings.describeLowThreshold(),
usage
);
} else if (wasOverHighThreshold) {
logger.info(
"high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded",
diskThresholdSettings.describeHighThreshold(),
usage,
diskThresholdSettings.describeLowThreshold()
);
}

} else {

nodesOverHighThresholdAndRelocating.remove(node);

if (nodesOverLowThreshold.contains(node)) {
// The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more
// shards
// if we reroute now.
if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "one or more nodes has gone under the high or low watermark";
nodesOverLowThreshold.remove(node);
nodesOverHighThreshold.remove(node);

if (wasUnderLowThreshold) {
logger.info(
"low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
"low disk watermark [{}] no longer exceeded on {}",
diskThresholdSettings.describeLowThreshold(),
usage
);
} else if (wasOverHighThreshold) {
logger.info(
"high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded",
diskThresholdSettings.describeHighThreshold(),
usage,
diskThresholdSettings.describeLowThreshold()
);
}

} else {

nodesOverHighThresholdAndRelocating.remove(node);

if (nodesOverLowThreshold.contains(node)) {
// The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more
// shards
// if we reroute now.
if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "one or more nodes has gone under the high or low watermark";
nodesOverLowThreshold.remove(node);
nodesOverHighThreshold.remove(node);

logger.info(
"low disk watermark [{}] no longer exceeded on {}",
diskThresholdSettings.describeLowThreshold(),
usage
);

} else {
logger.debug(
"{} has gone below a disk threshold, but an automatic reroute has occurred "
+ "in the last [{}], skipping reroute",
node,
diskThresholdSettings.getRerouteInterval()
);
}
} else {
logger.debug(
"{} has gone below a disk threshold, but an automatic reroute has occurred "
+ "in the last [{}], skipping reroute",
node,
diskThresholdSettings.getRerouteInterval()
);
}

}

}
}

final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4);
Expand All @@ -309,8 +330,8 @@ public void onNewInfo(ClusterInfo info) {
relocatingShardsSize = 0L;
}

if (usageIncludingRelocations.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usageIncludingRelocations.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
boolean isNodeWarm = REMOTE_CAPABLE.equals(getNodePool(routingNode));
if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageIncludingRelocations, isNodeWarm)) {

nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId());
logger.warn(
Expand Down Expand Up @@ -413,6 +434,29 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
);
}

private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) {
double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get();
AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize;
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index()))))
.collect(Collectors.toList());

long remoteShardSize = 0L;
for (ShardRouting shard : remoteShardsOnNode) {
remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable());
}
final DiskUsage warmDiskUsage = new DiskUsage(
diskUsage.getNodeId(),
diskUsage.getNodeName(),
diskUsage.getPath(),
totalAddressableSpace,
totalAddressableSpace - remoteShardSize
);
return warmDiskUsage;
}

private void markNodesMissingUsageIneligibleForRelease(
RoutingNodes routingNodes,
Map<String, DiskUsage> usages,
Expand Down
Loading
Loading