From d73241fb2be12bc3dbcb29f3efcd5d29d2acc831 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Tue, 27 May 2025 20:05:30 +0530 Subject: [PATCH 01/12] Support Warm Index Write Block on flood Watermark breach Signed-off-by: Gagan Singh Saini --- .../allocation/DiskThresholdMonitor.java | 251 ++++++++++++++---- .../main/java/org/opensearch/node/Node.java | 3 +- .../allocation/DiskThresholdMonitorTests.java | 24 +- 3 files changed, 214 insertions(+), 64 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 4b01fc7a2dba3..1958195137742 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -53,6 +53,8 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.transport.client.Client; import java.util.ArrayList; @@ -68,6 +70,10 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; +import static org.opensearch.cluster.routing.RoutingPool.getIndexPool; +import static org.opensearch.cluster.routing.RoutingPool.getNodePool; + /** * Listens for a node to go over the high watermark and kicks off an empty * reroute if it does. Also responsible for logging about nodes that have @@ -81,6 +87,7 @@ public class DiskThresholdMonitor { private final DiskThresholdSettings diskThresholdSettings; private final Client client; private final Supplier clusterStateSupplier; + private final Supplier dataToFileCacheSizeRatioSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); @@ -110,13 +117,15 @@ public DiskThresholdMonitor( ClusterSettings clusterSettings, Client client, LongSupplier currentTimeMillisSupplier, - RerouteService rerouteService + RerouteService rerouteService, + Supplier dataToFileCacheSizeRatioSupplier ) { this.clusterStateSupplier = clusterStateSupplier; this.currentTimeMillisSupplier = currentTimeMillisSupplier; this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; + this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; } private void checkFinished() { @@ -162,11 +171,19 @@ public void onNewInfo(ClusterInfo info) { for (final Map.Entry entry : usages.entrySet()) { final String node = entry.getKey(); - final DiskUsage usage = entry.getValue(); + // Create DiskUsage for Warm Nodes based on total Addressable Space + DiskUsage usage = entry.getValue(); final RoutingNode routingNode = routingNodes.node(node); + if (routingNode == null) { + continue; + } + + final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode)); + if (isWarmNode) { + usage = getWarmDiskUsage(usage, info, routingNode, state); + } - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() - || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { + if (isNodeExceedingFloodStageWatermark(usage, isWarmNode)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -189,8 +206,7 @@ public void onNewInfo(ClusterInfo info) { continue; } - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() - || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if (isNodeExceedingHighWatermark(usage, isWarmNode)) { if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { @@ -200,6 +216,7 @@ public void onNewInfo(ClusterInfo info) { } } + // Check if for warm node if reserved space comes out to be zero, if not make it 0 final long reservedSpace = info.getReservedSpace(usage.getNodeId(), usage.getPath()).getTotal(); final DiskUsage usageWithReservedSpace = new DiskUsage( usage.getNodeId(), @@ -209,8 +226,8 @@ public void onNewInfo(ClusterInfo info) { Math.max(0L, usage.getFreeBytes() - reservedSpace) ); - if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() - || usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + // Check if node exceeds high watermark with reserved space factored in + if (isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -228,61 +245,60 @@ public void onNewInfo(ClusterInfo info) { ); } - } else if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() - || usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + } else if (isNodeExceedingLowWatermark(usage, isWarmNode)) { - nodesOverHighThresholdAndRelocating.remove(node); + nodesOverHighThresholdAndRelocating.remove(node); - final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node); - final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node); - assert (wasUnderLowThreshold && wasOverHighThreshold) == false; + final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node); + final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node); + assert (wasUnderLowThreshold && wasOverHighThreshold) == false; + + if (wasUnderLowThreshold) { + logger.info( + "low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + diskThresholdSettings.describeLowThreshold(), + usage + ); + } else if (wasOverHighThreshold) { + logger.info( + "high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded", + diskThresholdSettings.describeHighThreshold(), + usage, + diskThresholdSettings.describeLowThreshold() + ); + } + + } else { + + nodesOverHighThresholdAndRelocating.remove(node); + + if (nodesOverLowThreshold.contains(node)) { + // The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more + // shards + // if we reroute now. + if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { + reroute = true; + explanation = "one or more nodes has gone under the high or low watermark"; + nodesOverLowThreshold.remove(node); + nodesOverHighThreshold.remove(node); - if (wasUnderLowThreshold) { logger.info( - "low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + "low disk watermark [{}] no longer exceeded on {}", diskThresholdSettings.describeLowThreshold(), usage ); - } else if (wasOverHighThreshold) { - logger.info( - "high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded", - diskThresholdSettings.describeHighThreshold(), - usage, - diskThresholdSettings.describeLowThreshold() - ); - } - } else { - - nodesOverHighThresholdAndRelocating.remove(node); - - if (nodesOverLowThreshold.contains(node)) { - // The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more - // shards - // if we reroute now. - if (lastRunTimeMillis.get() <= currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { - reroute = true; - explanation = "one or more nodes has gone under the high or low watermark"; - nodesOverLowThreshold.remove(node); - nodesOverHighThreshold.remove(node); - - logger.info( - "low disk watermark [{}] no longer exceeded on {}", - diskThresholdSettings.describeLowThreshold(), - usage - ); - - } else { - logger.debug( - "{} has gone below a disk threshold, but an automatic reroute has occurred " - + "in the last [{}], skipping reroute", - node, - diskThresholdSettings.getRerouteInterval() - ); - } + } else { + logger.debug( + "{} has gone below a disk threshold, but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", + node, + diskThresholdSettings.getRerouteInterval() + ); } - } + + } } final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4); @@ -309,8 +325,8 @@ public void onNewInfo(ClusterInfo info) { relocatingShardsSize = 0L; } - if (usageIncludingRelocations.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() - || usageIncludingRelocations.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + boolean isNodeWarm = REMOTE_CAPABLE.equals(getNodePool(routingNode)); + if (isNodeExceedingHighWatermark(usageIncludingRelocations, isNodeWarm)) { nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId()); logger.warn( @@ -413,6 +429,131 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste ); } + private boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { + if (!isWarmNode) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage(); + } else { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceFloodStageThreshold = calculateFreeSpaceFloodStageThreshold(totalBytes); + + return freeSpace < freeSpaceFloodStageThreshold; + } + } + + private boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { + if (!isWarmNode) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh(); + } else { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(totalBytes); + + return freeSpace < freeSpaceHighThreshold; + } + } + + private boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { + if (!isWarmNode) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow(); + } else { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceHighThreshold = calculateFreeSpaceLowThreshold(totalBytes); + + return freeSpace < freeSpaceHighThreshold; + } + } + + private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) { + double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + FileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null); + final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; + long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize; + final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) + .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index())))) + .collect(Collectors.toList()); + + var remoteShardSize = 0L; + for (ShardRouting shard : remoteShardsOnNode) { + remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable()); + } + final DiskUsage warmDiskUsage = new DiskUsage( + diskUsage.getNodeId(), + diskUsage.getNodeName(), + diskUsage.getPath(), + totalAddressableSpace, + totalAddressableSpace - remoteShardSize + ); + return warmDiskUsage; + } + + private long calculateFreeSpaceLowThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + private long calculateFreeSpaceHighThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + private long calculateFreeSpaceFloodStageThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + private void markNodesMissingUsageIneligibleForRelease( RoutingNodes routingNodes, Map usages, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 25a43aa635127..49637da795406 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1338,7 +1338,8 @@ protected Node(final Environment initialEnvironment, Collection clas clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, - rerouteService + rerouteService, + new FileCacheSettings(settings, clusterService.getClusterSettings())::getRemoteDataRatio ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 6ab57d10b05c1..ac5f6e277e649 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -125,7 +125,8 @@ public void testMarkFloodStageIndicesReadOnly() { assertTrue(reroute.compareAndSet(false, true)); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(null); - } + }, + () -> 5.0 ) { @Override @@ -187,7 +188,8 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertTrue(reroute.compareAndSet(false, true)); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(null); - } + }, + () -> 5.0 ) { @Override protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { @@ -228,7 +230,8 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); assertTrue(listenerReference.compareAndSet(null, listener)); - } + }, + () -> 5.0 ) { @Override protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { @@ -369,7 +372,8 @@ public void testAutoReleaseIndices() { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(clusterState); - } + }, + () -> 5.0 ) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -431,7 +435,8 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(clusterStateWithBlocks); - } + }, + () -> 5.0 ) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -544,7 +549,8 @@ public long getAsLong() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, timeSupplier, - (reason, priority, listener) -> listener.onResponse(clusterStateRef.get()) + (reason, priority, listener) -> listener.onResponse(clusterStateRef.get()), + () -> 5.0 ) { @Override protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { @@ -696,7 +702,8 @@ public void testIndexCreateBlockWhenNoDataNodeHealthy() { assertTrue(reroute.compareAndSet(false, true)); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(null); - } + }, + () -> 5.0 ) { @Override @@ -773,7 +780,8 @@ public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() { currentTime::get, (reason, priority, listener) -> { listener.onResponse(null); - } + }, + () -> 5.0 ) { @Override From 1f98e959678c3980c246ad4643d484a9c0245e57 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Sun, 8 Jun 2025 17:04:04 +0530 Subject: [PATCH 02/12] Refactoring Disk Threshold Evaluation for hot and warm nodes Signed-off-by: Gagan Singh Saini --- .../allocation/DiskThresholdEvaluator.java | 69 ++++++++++ .../allocation/DiskThresholdMonitor.java | 108 ++------------- .../HotNodeDiskThresholdEvaluator.java | 103 +++++++++++++++ .../WarmNodeDiskThresholdEvaluator.java | 125 ++++++++++++++++++ .../decider/DiskThresholdDecider.java | 3 - .../decider/WarmDiskThresholdDecider.java | 44 +----- 6 files changed, 315 insertions(+), 137 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java new file mode 100644 index 0000000000000..4855158ca0cb8 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.DiskUsage; + +/** + * Base interface for disk threshold evaluation logic. + * This interface defines methods for evaluating whether a node exceeds + * various watermarks based on disk usage. + * + * @opensearch.internal + */ +public interface DiskThresholdEvaluator { + + /** + * Checks if a node is exceeding the low watermark threshold + * + * @param diskUsage disk usage for the node + * @return true if the node is exceeding the low watermark, false otherwise + */ + boolean isNodeExceedingLowWatermark(DiskUsage diskUsage); + + /** + * Checks if a node is exceeding the high watermark threshold + * + * @param diskUsage disk usage for the node + * @return true if the node is exceeding the high watermark, false otherwise + */ + boolean isNodeExceedingHighWatermark(DiskUsage diskUsage); + + /** + * Checks if a node is exceeding the flood stage watermark threshold + * + * @param diskUsage disk usage for the node + * @return true if the node is exceeding the flood stage watermark, false otherwise + */ + boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage); + + /** + * Calculates the free space low threshold for a given total space + * + * @param totalSpace total available space + * @return free space low threshold in bytes + */ + long calculateFreeSpaceLowThreshold(long totalSpace); + + /** + * Calculates the free space high threshold for a given total space + * + * @param totalSpace total available space + * @return free space high threshold in bytes + */ + long calculateFreeSpaceHighThreshold(long totalSpace); + + /** + * Calculates the free space flood stage threshold for a given total space + * + * @param totalSpace total available space + * @return free space flood stage threshold in bytes + */ + long calculateFreeSpaceFloodStageThreshold(long totalSpace); +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 1958195137742..ff8f69b94f999 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -53,8 +53,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; import org.opensearch.transport.client.Client; import java.util.ArrayList; @@ -90,6 +89,8 @@ public class DiskThresholdMonitor { private final Supplier dataToFileCacheSizeRatioSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; + private final HotNodeDiskThresholdEvaluator hotNodeEvaluator; + private final WarmNodeDiskThresholdEvaluator warmNodeEvaluator; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); @@ -125,6 +126,8 @@ public DiskThresholdMonitor( this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; + this.hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings); + this.warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier); this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; } @@ -430,56 +433,23 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste } private boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { - if (!isWarmNode) { - return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() - || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage(); - } else { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { - return false; - } - long totalBytes = diskUsage.getTotalBytes(); - long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceFloodStageThreshold = calculateFreeSpaceFloodStageThreshold(totalBytes); - - return freeSpace < freeSpaceFloodStageThreshold; - } + DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; + return evaluator.isNodeExceedingFloodStageWatermark(diskUsage); } private boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { - if (!isWarmNode) { - return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() - || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh(); - } else { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { - return false; - } - long totalBytes = diskUsage.getTotalBytes(); - long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(totalBytes); - - return freeSpace < freeSpaceHighThreshold; - } + DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; + return evaluator.isNodeExceedingHighWatermark(diskUsage); } private boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { - if (!isWarmNode) { - return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() - || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow(); - } else { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { - return false; - } - long totalBytes = diskUsage.getTotalBytes(); - long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceHighThreshold = calculateFreeSpaceLowThreshold(totalBytes); - - return freeSpace < freeSpaceHighThreshold; - } + DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; + return evaluator.isNodeExceedingLowWatermark(diskUsage); } private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) { double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - FileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null); + AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null); final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize; final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) @@ -500,60 +470,6 @@ private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, Routin return warmDiskUsage; } - private long calculateFreeSpaceLowThreshold(long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; - } - - private long calculateFreeSpaceHighThreshold(long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; - } - - private long calculateFreeSpaceFloodStageThreshold(long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; - } - private void markNodesMissingUsageIneligibleForRelease( RoutingNodes routingNodes, Map usages, diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java new file mode 100644 index 0000000000000..55340444275fc --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.DiskUsage; +import org.opensearch.core.common.unit.ByteSizeValue; + +/** + * DiskThresholdEvaluator implementation for hot data nodes. + * This evaluator uses standard disk usage metrics and thresholds + * for determining if a node exceeds watermarks. + * + * @opensearch.internal + */ +public class HotNodeDiskThresholdEvaluator implements DiskThresholdEvaluator { + + private final DiskThresholdSettings diskThresholdSettings; + + public HotNodeDiskThresholdEvaluator(DiskThresholdSettings diskThresholdSettings) { + this.diskThresholdSettings = diskThresholdSettings; + } + + @Override + public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow(); + } + + @Override + public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh(); + } + + @Override + public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { + return diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() + || diskUsage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage(); + } + + @Override + public long calculateFreeSpaceLowThreshold(long totalSpace) { + // For hot data nodes, we use the standard disk threshold settings + // Check for absolute bytes threshold first + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes(); + } + + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); + if (percentageThreshold > 0) { + return (long) (totalSpace * percentageThreshold / 100.0); + } + + // Default fallback + return 0; + } + + @Override + public long calculateFreeSpaceHighThreshold(long totalSpace) { + // For hot data nodes, we use the standard disk threshold settings + // Check for absolute bytes threshold first + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes(); + } + + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); + if (percentageThreshold > 0) { + return (long) (totalSpace * percentageThreshold / 100.0); + } + + // Default fallback + return 0; + } + + @Override + public long calculateFreeSpaceFloodStageThreshold(long totalSpace) { + // For hot data nodes, we use the standard disk threshold settings + // Check for absolute bytes threshold first + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes(); + } + + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); + if (percentageThreshold > 0) { + return (long) (totalSpace * percentageThreshold / 100.0); + } + + // Default fallback + return 0; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java new file mode 100644 index 0000000000000..950defdb3eb83 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.DiskUsage; +import org.opensearch.core.common.unit.ByteSizeValue; + +import java.util.function.Supplier; + +/** + * DiskThresholdEvaluator implementation for warm nodes. + * This evaluator uses file cache size ratio and addressable space + * calculations specific to warm nodes for determining if a node exceeds watermarks. + * + * @opensearch.internal + */ +public class WarmNodeDiskThresholdEvaluator implements DiskThresholdEvaluator { + + private final DiskThresholdSettings diskThresholdSettings; + private final Supplier dataToFileCacheSizeRatioSupplier; + + public WarmNodeDiskThresholdEvaluator(DiskThresholdSettings diskThresholdSettings, Supplier dataToFileCacheSizeRatioSupplier) { + this.diskThresholdSettings = diskThresholdSettings; + this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; + } + + @Override + public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage) { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceLowThreshold = calculateFreeSpaceLowThreshold(totalBytes); + + return freeSpace < freeSpaceLowThreshold; + } + + @Override + public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage) { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(totalBytes); + + return freeSpace < freeSpaceHighThreshold; + } + + @Override + public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { + if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + return false; + } + long totalBytes = diskUsage.getTotalBytes(); + long freeSpace = diskUsage.getFreeBytes(); + long freeSpaceFloodStageThreshold = calculateFreeSpaceFloodStageThreshold(totalBytes); + + return freeSpace < freeSpaceFloodStageThreshold; + } + + @Override + public long calculateFreeSpaceLowThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + @Override + public long calculateFreeSpaceHighThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + @Override + public long calculateFreeSpaceFloodStageThreshold(long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index b6715e3ecb458..7bf8f86537c37 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -53,7 +53,6 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.store.remote.filecache.FileCacheSettings; import org.opensearch.snapshots.SnapshotShardSizeInfo; import java.util.List; @@ -100,13 +99,11 @@ public class DiskThresholdDecider extends AllocationDecider { private final DiskThresholdSettings diskThresholdSettings; private final boolean enableForSingleDataNode; - private final FileCacheSettings fileCacheSettings; public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) { this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9"; this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings); - this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java index c5db09fcbd608..0274d9be3faac 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java @@ -38,8 +38,10 @@ import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.DiskThresholdEvaluator; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.WarmNodeDiskThresholdEvaluator; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeValue; @@ -91,12 +93,14 @@ public class WarmDiskThresholdDecider extends AllocationDecider { private final FileCacheSettings fileCacheSettings; private final DiskThresholdSettings diskThresholdSettings; private final boolean enableForSingleDataNode; + private final DiskThresholdEvaluator diskThresholdEvaluator; public WarmDiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) { this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings); this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9"; this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings); + this.diskThresholdEvaluator = new WarmNodeDiskThresholdEvaluator(diskThresholdSettings, fileCacheSettings::getRemoteDataRatio); } @Override @@ -125,7 +129,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false); final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; final long freeSpaceAfterAllocation = freeSpace > shardSize ? freeSpace - shardSize : 0; - final long freeSpaceLowThreshold = calculateFreeSpaceLowThreshold(diskThresholdSettings, totalAddressableSpace); + final long freeSpaceLowThreshold = diskThresholdEvaluator.calculateFreeSpaceLowThreshold(totalAddressableSpace); final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold); final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); @@ -185,7 +189,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true); final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; - final long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(diskThresholdSettings, totalAddressableSpace); + final long freeSpaceHighThreshold = diskThresholdEvaluator.calculateFreeSpaceHighThreshold(totalAddressableSpace); final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold); final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); @@ -216,42 +220,6 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl ); } - private long calculateFreeSpaceLowThreshold(DiskThresholdSettings diskThresholdSettings, long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; - } - - private long calculateFreeSpaceHighThreshold(DiskThresholdSettings diskThresholdSettings, long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; - } - private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAllocation allocation, boolean subtractLeavingShards) { final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) .filter( From 341e34a579700d3550afa0ce46482f172191d9da Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Mon, 9 Jun 2025 18:22:28 +0530 Subject: [PATCH 03/12] Add unit tests for warm node disk threshold monitor Signed-off-by: Gagan Singh Saini --- .../allocation/DiskThresholdMonitorTests.java | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index ac5f6e277e649..d17836dffae04 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -43,6 +43,8 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingTable; @@ -52,6 +54,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; @@ -833,6 +837,243 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertEquals(countUnblockBlocksCalled.get(), 1); } + public void testWarmNodeLowStageWatermarkBreach() { + AllocationService allocation = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("remote_index") + .settings( + settings(Version.CURRENT).put("index.store.type", "remote_snapshot") + .put("index.routing.allocation.require._id", "warm_node") + ) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("remote_index")).build(); + + DiscoveryNode warmNode = newNode("warm_node", Collections.singleton(DiscoveryNodeRole.WARM_ROLE)); + + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(warmNode)) + .build(), + allocation + ); + + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + AtomicBoolean reroute = new AtomicBoolean(false); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor( + Settings.EMPTY, + () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + () -> 0L, + (reason, priority, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }, + () -> 2.0 // dataToFileCacheSizeRatio = 2 + ) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + assertTrue(readOnly); + listener.onResponse(null); + } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } + }; + + // Test warm node exceeding low stage watermark + // Total addressable space = dataToFileCacheSizeRatio * nodeCacheSize = 2.0 * 100 = 200 + // High stage threshold (50%) = 200 * 0.15 = 30 + // Free space = 28 < 30, so should exceed low stage + Map diskUsages = new HashMap<>(); + diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8)); + + Map fileCacheStats = new HashMap<>(); + fileCacheStats.put("warm_node", createAggregateFileCacheStats(100)); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[remote_index][0][p]", 172L); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), fileCacheStats); + + monitor.onNewInfo(clusterInfo); + + // Should not mark remote indices read-only due to low stage breach + // and should not trigger reroute + assertNull(indicesToMarkReadOnly.get()); + assertFalse(reroute.get()); + } + + public void testWarmNodeHighStageWatermarkBreach() { + AllocationService allocation = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("remote_index") + .settings( + settings(Version.CURRENT).put("index.store.type", "remote_snapshot") + .put("index.routing.allocation.require._id", "warm_node") + ) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("remote_index")).build(); + + DiscoveryNode warmNode = newNode("warm_node", Collections.singleton(DiscoveryNodeRole.WARM_ROLE)); + + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(warmNode)) + .build(), + allocation + ); + + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + AtomicBoolean reroute = new AtomicBoolean(false); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor( + Settings.EMPTY, + () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + () -> 0L, + (reason, priority, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }, + () -> 2.0 // dataToFileCacheSizeRatio = 2 + ) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + assertTrue(readOnly); + listener.onResponse(null); + } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } + }; + + // Test warm node exceeding high stage watermark + // Total addressable space = dataToFileCacheSizeRatio * nodeCacheSize = 2.0 * 100 = 200 + // High stage threshold (10%) = 200 * 0.1 = 20 + // Free space = 18 < 20, so should exceed high stage + Map diskUsages = new HashMap<>(); + diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8)); + + Map fileCacheStats = new HashMap<>(); + fileCacheStats.put("warm_node", createAggregateFileCacheStats(100)); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[remote_index][0][p]", 182L); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), fileCacheStats); + + monitor.onNewInfo(clusterInfo); + + // Should not mark remote indices read-only due to High stage breach + // but should trigger reroute + assertNull(indicesToMarkReadOnly.get()); + assertTrue(reroute.get()); + } + + public void testWarmNodeFloodStageWatermarkBreach() { + AllocationService allocation = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("remote_index") + .settings( + settings(Version.CURRENT).put("index.store.type", "remote_snapshot") + .put("index.routing.allocation.require._id", "warm_node") + ) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("remote_index")).build(); + + DiscoveryNode warmNode = newNode("warm_node", Collections.singleton(DiscoveryNodeRole.WARM_ROLE)); + + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(warmNode)) + .build(), + allocation + ); + + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor( + Settings.EMPTY, + () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + () -> 0L, + (reason, priority, listener) -> listener.onResponse(null), + () -> 2.0 // dataToFileCacheSizeRatio = 2 + ) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + assertTrue(readOnly); + listener.onResponse(null); + } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + listener.onResponse(null); + } + }; + + // Test warm node exceeding flood stage watermark + // Total addressable space = dataToFileCacheSizeRatio * nodeCacheSize = 2.0 * 100 = 200 + // Flood stage threshold (5%) = 200 * 0.05 = 10 + // Free space = 8 < 10, so should exceed flood stage + Map diskUsages = new HashMap<>(); + diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8)); + + Map fileCacheStats = new HashMap<>(); + fileCacheStats.put("warm_node", createAggregateFileCacheStats(100)); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[remote_index][0][p]", 192L); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), fileCacheStats); + + monitor.onNewInfo(clusterInfo); + + // Should mark remote indices read-only due to flood stage breach + assertNotNull(indicesToMarkReadOnly.get()); + assertTrue(indicesToMarkReadOnly.get().contains("remote_index")); + } + private void assertNoLogging(DiskThresholdMonitor monitor, final Map diskUsages) throws IllegalAccessException { try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) { mockAppender.addExpectation( @@ -913,4 +1154,48 @@ private static ClusterInfo clusterInfo( return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of()); } + private static AggregateFileCacheStats createAggregateFileCacheStats(long totalCacheSize) { + FileCacheStats overallStats = new FileCacheStats( + 0, + totalCacheSize, + 0, + 0, + 0, + 0, + 0, + AggregateFileCacheStats.FileCacheStatsType.OVER_ALL_STATS + ); + FileCacheStats fullStats = new FileCacheStats( + 0, + totalCacheSize, + 0, + 0, + 0, + 0, + 0, + AggregateFileCacheStats.FileCacheStatsType.FULL_FILE_STATS + ); + FileCacheStats blockStats = new FileCacheStats( + 0, + totalCacheSize, + 0, + 0, + 0, + 0, + 0, + AggregateFileCacheStats.FileCacheStatsType.BLOCK_FILE_STATS + ); + FileCacheStats pinnedStats = new FileCacheStats( + 0, + totalCacheSize, + 0, + 0, + 0, + 0, + 0, + AggregateFileCacheStats.FileCacheStatsType.PINNED_FILE_STATS + ); + return new AggregateFileCacheStats(System.currentTimeMillis(), overallStats, fullStats, blockStats, pinnedStats); + } + } From e7d8710dffb50c421326af85e964f434fe94804c Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Wed, 11 Jun 2025 18:47:03 +0530 Subject: [PATCH 04/12] Add IT test for warm node disk threshold monitor Signed-off-by: Gagan Singh Saini --- .../decider/DiskThresholdDeciderIT.java | 301 +++++++++++++----- .../allocation/DiskThresholdEvaluator.java | 12 +- .../allocation/DiskThresholdMonitor.java | 7 +- .../HotNodeDiskThresholdEvaluator.java | 64 ++-- .../WarmNodeDiskThresholdEvaluator.java | 95 +++--- .../decider/WarmDiskThresholdDecider.java | 4 +- 6 files changed, 290 insertions(+), 193 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index fc126613ce34e..16c8ff5108d43 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -32,6 +32,9 @@ package org.opensearch.cluster.routing.allocation.decider; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.apache.lucene.tests.mockfile.FilterFileStore; import org.apache.lucene.tests.mockfile.FilterFileSystemProvider; import org.apache.lucene.tests.mockfile.FilterPath; @@ -57,20 +60,25 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtilsForTesting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsService; +import org.opensearch.node.Node; import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotState; import org.opensearch.test.InternalSettingsPlugin; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; @@ -93,11 +101,14 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.opensearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING; +import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; @@ -106,13 +117,34 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class DiskThresholdDeciderIT extends OpenSearchIntegTestCase { +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@ParameterizedStaticSettingsOpenSearchIntegTestCase.ClusterScope(scope = ParameterizedStaticSettingsOpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class DiskThresholdDeciderIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public DiskThresholdDeciderIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } private static TestFileSystemProvider fileSystemProvider; private FileSystem defaultFileSystem; + protected static final String BASE_REMOTE_REPO = "test-rs-repo" + "__rs"; + protected Path remoteRepoPath; + + @Before + public void setup() { + remoteRepoPath = randomRepoPath().toAbsolutePath(); + } + @Before public void installFilesystemProvider() { assertNull(defaultFileSystem); @@ -151,12 +183,23 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), WATERMARK_BYTES + "b") .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b") .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms") + .put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 1) .build(); } @Override protected Collection> nodePlugins() { - return List.of(InternalSettingsPlugin.class, MockInternalClusterInfoService.TestPlugin.class); + var pluginList = List.of( + InternalSettingsPlugin.class, + MockInternalClusterInfoService.TestPlugin.class, + MockFsRepositoryPlugin.class + ); + return Stream.concat(super.nodePlugins().stream(), pluginList.stream()).collect(Collectors.toList()); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return WRITABLE_WARM_INDEX_SETTING.get(settings) == false; } public void testHighWatermarkNotExceeded() throws Exception { @@ -187,18 +230,14 @@ public void testHighWatermarkNotExceeded() throws Exception { } public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .build(); - - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); + Settings nodeSettings = buildTestSettings(false, null); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); - final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Reduce disk space of all node until all of them is breaching high disk watermark. - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); + + createTestIndices(nodeNames); + simulateDiskPressure(getMockInternalClusterInfoService()); + assertBusy(() -> { ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); @@ -206,8 +245,9 @@ public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Excep } public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); + Settings nodeSettings = buildTestSettings(false, null); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + startTestNodes(2, nodeSettings); ensureStableCluster(3); final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster() @@ -220,19 +260,15 @@ public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throw } public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithAutoReleaseEnabled() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .build(); + final Settings nodeSettings = buildTestSettings(false, null); + internalCluster().startClusterManagerOnlyNode(nodeSettings); - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Reduce disk space of all node until all of them is breaching high disk watermark. - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); + createTestIndices(nodeNames); + simulateDiskPressure(clusterInfoService); // Validate if cluster block is applied on the cluster assertBusy(() -> { @@ -241,45 +277,34 @@ public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithA }, 30L, TimeUnit.SECONDS); // Free all the space - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) - ); + releaseDiskPressure(clusterInfoService); // Validate if index create block is removed on the cluster. Need to refresh this periodically as well to remove // the node from high watermark breached list. assertBusy(() -> { - clusterInfoService.refresh(); + getMockInternalClusterInfoService().refresh(); ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); } public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithAutoReleaseDisabled() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), false) - .build(); - - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); + final Settings nodeSettings = buildTestSettings(false, false); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Reduce disk space of all node until all of them is breaching high disk watermark - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); + createTestIndices(nodeNames); + simulateDiskPressure(clusterInfoService); - // Validate if cluster block is applied on the cluster assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); // Free all the space - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) - ); + releaseDiskPressure(clusterInfoService); // Validate index create block is not removed on the cluster assertBusy(() -> { @@ -289,23 +314,19 @@ public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithA } public void testDiskMonitorAppliesBlockBackWhenUserRemovesIndexCreateBlock() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), false) - .build(); - - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); + final Settings nodeSettings = buildTestSettings(false, false); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + createTestIndices(nodeNames); + // User applies index create block. Settings createBlockSetting = Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), "true").build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(createBlockSetting).get()); - final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Reduce disk space of all node until all of them is breaching high disk watermark and DiskMonitor applies block. - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); + + simulateDiskPressure(clusterInfoService); // Validate if cluster block is applied on the cluster assertBusy(() -> { @@ -327,29 +348,43 @@ public void testDiskMonitorAppliesBlockBackWhenUserRemovesIndexCreateBlock() thr } public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .build(); + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); - internalCluster().startClusterManagerOnlyNode(settings); - final List dataNodeNames = internalCluster().startDataOnlyNodes(2, settings); + final Settings nodeSettings = buildTestSettings(false, false); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Create one of the index. - final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createAndPopulateIndex(indexName, dataNodeNames.get(0)); - // Apply a read_only_allow_delete_block on one of the index - // (can happen if the corresponding node has breached flood stage watermark). - final Settings readOnlySettings = Settings.builder() - .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) - .build(); - client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get(); + + if (isWarmIndex) { + // Create indices + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String indexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, nodeNames.get(0), true); + createIndex(indexName2, nodeNames.get(1), true); + + // Apply a read_only_allow_delete_block on the indices + final Settings readOnlySettings = Settings.builder() + .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) + .build(); + client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get(); + client().admin().indices().prepareUpdateSettings(indexName2).setSettings(readOnlySettings).get(); + } else { + // Create one of the index. + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createAndPopulateIndex(indexName, nodeNames.get(0)); + + // Apply a read_only_allow_delete_block on one of the index + // (can happen if the corresponding node has breached flood stage watermark). + final Settings readOnlySettings = Settings.builder() + .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) + .build(); + client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get(); + } // Reduce disk space of all node until all of them is breaching high disk watermark. - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) - ); + simulateDiskPressure(clusterInfoService); // Validate index create block is applied on the cluster assertBusy(() -> { @@ -358,7 +393,13 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { }, 30L, TimeUnit.SECONDS); } + /** + * This test is excluded from parameterization and runs only once + */ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception { + // Skip this test when running with parameters to ensure it only runs once + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); + internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); final String dataNodeName = internalCluster().startDataOnlyNode(); @@ -431,19 +472,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti } public void testDiskMonitorResetLastRuntimeMilliSecOnlyInFirstCall() throws Exception { - final Settings settings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), false) - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .build(); - - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); + final Settings nodeSettings = buildTestSettings(false, false); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + var nodeNames = startTestNodes(2, nodeSettings); ensureStableCluster(3); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - // Reduce disk space of all node. - clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, 0)); + createTestIndices(nodeNames); + // Reduce disk space of all node. + simulateDiskPressure(clusterInfoService); // Validate if cluster block is applied on the cluster assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); @@ -454,9 +492,7 @@ public void testDiskMonitorResetLastRuntimeMilliSecOnlyInFirstCall() throws Exce Settings removeBlockSetting = Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), "false").build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(removeBlockSetting).get()); // Free all the space - clusterInfoService.setDiskUsageFunctionAndRefresh( - (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) - ); + releaseDiskPressure(clusterInfoService); // Validate index create block is removed on the cluster assertBusy(() -> { @@ -471,13 +507,16 @@ private String populateNode(final String dataNodeName) throws Exception { return indexName; } - private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception { - + private void createIndex(String indexName, String nodeName, boolean isWarmIndex) throws Exception { final Settings.Builder indexSettingBuilder = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false); + if (isWarmIndex) { + indexSettingBuilder.put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true); + } + // Depending on node name specified or not, we determine whether to enable node name based shard routing for index // and whether reallocation is disabled on that index or not. if (nodeName != null) { @@ -499,7 +538,10 @@ private long createAndPopulateIndex(final String indexName, final String nodeNam indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6); createIndex(indexName, indexSettingBuilder.build()); } + } + private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception { + createIndex(indexName, nodeName, false); return createReasonableSizedShards(indexName); } @@ -600,6 +642,89 @@ private MockInternalClusterInfoService getMockInternalClusterInfoService() { return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); } + private Settings warmNodeSettings(ByteSizeValue cacheSize) { + return Settings.builder() + .put(super.nodeSettings(0)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + + /** + * Helper method to build test-specific settings with support for both hot and warm indices + */ + private Settings buildTestSettings(boolean diskThresholdEnabled, Boolean autoRelease) { + Settings.Builder settingsBuilder = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), diskThresholdEnabled); + + if (autoRelease != null) { + settingsBuilder.put(DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), autoRelease); + } + + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + settingsBuilder.put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath)); + } + + return settingsBuilder.build(); + } + + /** + * Helper method to start nodes that support both data and warm roles + */ + private List startTestNodes(int nodeCount, Settings additionalSettings) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + Settings nodeSettings = Settings.builder() + .put(additionalSettings) + .put(warmNodeSettings(new ByteSizeValue(TOTAL_SPACE_BYTES))) + .build(); + return internalCluster().startDataAndWarmNodes(nodeCount, nodeSettings); + } else { + return internalCluster().startDataOnlyNodes(nodeCount, additionalSettings); + } + } + + /** + * Helper method to simulate disk pressure for both hot and warm indices + */ + private void simulateDiskPressure(MockInternalClusterInfoService clusterInfoService) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> TOTAL_SPACE_BYTES - WATERMARK_BYTES + 10); + } else { + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1) + ); + } + } + + /** + * Helper method to release disk pressure for both hot and warm indices + */ + private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoService) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 100L); + } else { + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); + } + } + + /** + * Helper method to create test indices for both hot and warm scenarios + */ + private void createTestIndices(List nodeNames) throws Exception { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex && nodeNames.size() >= 2) { + // Create warm indices on specific nodes + createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(0), true); + createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(1), true); + } + // For hot indices, no pre-creation needed as disk usage simulation handles it + } + private static class TestFileStore extends FilterFileStore { private final Path path; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java index 4855158ca0cb8..bbcde768dfbd3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdEvaluator.java @@ -44,26 +44,26 @@ public interface DiskThresholdEvaluator { boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage); /** - * Calculates the free space low threshold for a given total space + * Get the free space low threshold for a given total space * * @param totalSpace total available space * @return free space low threshold in bytes */ - long calculateFreeSpaceLowThreshold(long totalSpace); + long getFreeSpaceLowThreshold(long totalSpace); /** - * Calculates the free space high threshold for a given total space + * Get the free space high threshold for a given total space * * @param totalSpace total available space * @return free space high threshold in bytes */ - long calculateFreeSpaceHighThreshold(long totalSpace); + long getFreeSpaceHighThreshold(long totalSpace); /** - * Calculates the free space flood stage threshold for a given total space + * Get the free space flood stage threshold for a given total space * * @param totalSpace total available space * @return free space flood stage threshold in bytes */ - long calculateFreeSpaceFloodStageThreshold(long totalSpace); + long getFreeSpaceFloodStageThreshold(long totalSpace); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index ff8f69b94f999..c1ccb0380aa99 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -174,15 +174,16 @@ public void onNewInfo(ClusterInfo info) { for (final Map.Entry entry : usages.entrySet()) { final String node = entry.getKey(); - // Create DiskUsage for Warm Nodes based on total Addressable Space DiskUsage usage = entry.getValue(); final RoutingNode routingNode = routingNodes.node(node); if (routingNode == null) { continue; } + // Only for Dedicated Warm Nodes final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode)); if (isWarmNode) { + // Create DiskUsage for Warm Nodes based on total Addressable Space usage = getWarmDiskUsage(usage, info, routingNode, state); } @@ -219,7 +220,6 @@ public void onNewInfo(ClusterInfo info) { } } - // Check if for warm node if reserved space comes out to be zero, if not make it 0 final long reservedSpace = info.getReservedSpace(usage.getNodeId(), usage.getPath()).getTotal(); final DiskUsage usageWithReservedSpace = new DiskUsage( usage.getNodeId(), @@ -229,7 +229,6 @@ public void onNewInfo(ClusterInfo info) { Math.max(0L, usage.getFreeBytes() - reservedSpace) ); - // Check if node exceeds high watermark with reserved space factored in if (isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) { nodesOverLowThreshold.add(node); @@ -456,7 +455,7 @@ private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, Routin .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index())))) .collect(Collectors.toList()); - var remoteShardSize = 0L; + long remoteShardSize = 0L; for (ShardRouting shard : remoteShardsOnNode) { remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable()); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java index 55340444275fc..1ace462965bd7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/HotNodeDiskThresholdEvaluator.java @@ -45,56 +45,48 @@ public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { } @Override - public long calculateFreeSpaceLowThreshold(long totalSpace) { - // For hot data nodes, we use the standard disk threshold settings - // Check for absolute bytes threshold first - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes(); - } - - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); - if (percentageThreshold > 0) { - return (long) (totalSpace * percentageThreshold / 100.0); - } - - // Default fallback - return 0; + public long getFreeSpaceLowThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdLow(), + diskThresholdSettings.getFreeBytesThresholdLow(), + totalAddressableSpace + ); } @Override - public long calculateFreeSpaceHighThreshold(long totalSpace) { - // For hot data nodes, we use the standard disk threshold settings - // Check for absolute bytes threshold first - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes(); - } - - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); - if (percentageThreshold > 0) { - return (long) (totalSpace * percentageThreshold / 100.0); - } - - // Default fallback - return 0; + public long getFreeSpaceHighThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdHigh(), + diskThresholdSettings.getFreeBytesThresholdHigh(), + totalAddressableSpace + ); } @Override - public long calculateFreeSpaceFloodStageThreshold(long totalSpace) { + public long getFreeSpaceFloodStageThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdFloodStage(), + diskThresholdSettings.getFreeBytesThresholdFloodStage(), + totalAddressableSpace + ); + } + + private long calculateFreeSpaceWatermarkThreshold( + double freeDiskWatermarkThreshold, + ByteSizeValue freeBytesWatermarkThreshold, + long totalAddressableSpace + ) { // For hot data nodes, we use the standard disk threshold settings // Check for absolute bytes threshold first - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); + ByteSizeValue bytesThreshold = freeBytesWatermarkThreshold; if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { return bytesThreshold.getBytes(); } // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); + double percentageThreshold = freeDiskWatermarkThreshold; if (percentageThreshold > 0) { - return (long) (totalSpace * percentageThreshold / 100.0); + return (long) (totalAddressableSpace * percentageThreshold / 100.0); } // Default fallback diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java index 950defdb3eb83..ed38e9c1212ab 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.DiskUsage; import org.opensearch.core.common.unit.ByteSizeValue; +import java.util.function.Function; import java.util.function.Supplier; /** @@ -32,91 +33,71 @@ public WarmNodeDiskThresholdEvaluator(DiskThresholdSettings diskThresholdSetting @Override public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage) { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { - return false; - } - long totalBytes = diskUsage.getTotalBytes(); - long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceLowThreshold = calculateFreeSpaceLowThreshold(totalBytes); - - return freeSpace < freeSpaceLowThreshold; + return isNodeExceedingWatermark(diskUsage, this::getFreeSpaceLowThreshold); } @Override public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage) { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { - return false; - } - long totalBytes = diskUsage.getTotalBytes(); - long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(totalBytes); - - return freeSpace < freeSpaceHighThreshold; + return isNodeExceedingWatermark(diskUsage, this::getFreeSpaceHighThreshold); } @Override public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { + return isNodeExceedingWatermark(diskUsage, this::getFreeSpaceFloodStageThreshold); + } + + private boolean isNodeExceedingWatermark(DiskUsage diskUsage, Function thresholdFunction) { if (dataToFileCacheSizeRatioSupplier.get() <= 0) { return false; } long totalBytes = diskUsage.getTotalBytes(); long freeSpace = diskUsage.getFreeBytes(); - long freeSpaceFloodStageThreshold = calculateFreeSpaceFloodStageThreshold(totalBytes); + long freeSpaceThreshold = thresholdFunction.apply(totalBytes); - return freeSpace < freeSpaceFloodStageThreshold; + return freeSpace < freeSpaceThreshold; } @Override - public long calculateFreeSpaceLowThreshold(long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; + public long getFreeSpaceLowThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdLow(), + diskThresholdSettings.getFreeBytesThresholdLow(), + totalAddressableSpace + ); } @Override - public long calculateFreeSpaceHighThreshold(long totalAddressableSpace) { - // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); - } - - // Check for absolute bytes threshold - final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; - } - - // Default fallback - return 0; + public long getFreeSpaceHighThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdHigh(), + diskThresholdSettings.getFreeBytesThresholdHigh(), + totalAddressableSpace + ); } @Override - public long calculateFreeSpaceFloodStageThreshold(long totalAddressableSpace) { + public long getFreeSpaceFloodStageThreshold(long totalAddressableSpace) { + return calculateFreeSpaceWatermarkThreshold( + diskThresholdSettings.getFreeDiskThresholdFloodStage(), + diskThresholdSettings.getFreeBytesThresholdFloodStage(), + totalAddressableSpace + ); + } + + private long calculateFreeSpaceWatermarkThreshold( + double freeDiskWatermarkThreshold, + ByteSizeValue freeBytesWatermarkThreshold, + long totalAddressableSpace + ) { // Check for percentage-based threshold - double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdFloodStage(); - if (percentageThreshold > 0) { - return (long) (totalAddressableSpace * percentageThreshold / 100.0); + if (freeDiskWatermarkThreshold > 0) { + return (long) (totalAddressableSpace * freeDiskWatermarkThreshold / 100.0); } // Check for absolute bytes threshold final double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); - ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdFloodStage(); - if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { - return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + if (freeBytesWatermarkThreshold != null && freeBytesWatermarkThreshold.getBytes() > 0) { + return freeBytesWatermarkThreshold.getBytes() * (long) dataToFileCacheSizeRatio; } // Default fallback diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java index 4bc58e4872eb3..b25c2719d1849 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java @@ -129,7 +129,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false); final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; final long freeSpaceAfterAllocation = freeSpace > shardSize ? freeSpace - shardSize : 0; - final long freeSpaceLowThreshold = diskThresholdEvaluator.calculateFreeSpaceLowThreshold(totalAddressableSpace); + final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(totalAddressableSpace); final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold); final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); @@ -189,7 +189,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true); final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; - final long freeSpaceHighThreshold = diskThresholdEvaluator.calculateFreeSpaceHighThreshold(totalAddressableSpace); + final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(totalAddressableSpace); final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold); final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); From 3570c0db9a3cb1c68c1236984acf4f9dfc903cd0 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Wed, 11 Jun 2025 19:41:58 +0530 Subject: [PATCH 05/12] gradlew spotless fix Signed-off-by: Gagan Singh Saini --- .../routing/allocation/decider/DiskThresholdDeciderIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 16c8ff5108d43..3e6b2eb89f5c7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -60,7 +60,6 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtilsForTesting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.env.Environment; From e1a7fa66d10eb6eb0b85b5454643ffce82a5971e Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Wed, 11 Jun 2025 21:28:34 +0530 Subject: [PATCH 06/12] Resolve comments Signed-off-by: Gagan Singh Saini --- .../allocation/DiskThresholdMonitor.java | 36 ++++++----------- .../routing/allocation/NodeDiskEvaluator.java | 40 +++++++++++++++++++ 2 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index c1ccb0380aa99..c56f5a0df7d81 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -89,8 +89,7 @@ public class DiskThresholdMonitor { private final Supplier dataToFileCacheSizeRatioSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; - private final HotNodeDiskThresholdEvaluator hotNodeEvaluator; - private final WarmNodeDiskThresholdEvaluator warmNodeEvaluator; + private final NodeDiskEvaluator nodeDiskEvaluator; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); @@ -126,8 +125,12 @@ public DiskThresholdMonitor( this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; - this.hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings); - this.warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier); + DiskThresholdEvaluator hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings); + DiskThresholdEvaluator warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator( + diskThresholdSettings, + dataToFileCacheSizeRatioSupplier + ); + this.nodeDiskEvaluator = new NodeDiskEvaluator(hotNodeEvaluator, warmNodeEvaluator); this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; } @@ -187,7 +190,7 @@ public void onNewInfo(ClusterInfo info) { usage = getWarmDiskUsage(usage, info, routingNode, state); } - if (isNodeExceedingFloodStageWatermark(usage, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage, isWarmNode)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -210,7 +213,7 @@ public void onNewInfo(ClusterInfo info) { continue; } - if (isNodeExceedingHighWatermark(usage, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usage, isWarmNode)) { if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { @@ -229,7 +232,7 @@ public void onNewInfo(ClusterInfo info) { Math.max(0L, usage.getFreeBytes() - reservedSpace) ); - if (isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -247,7 +250,7 @@ public void onNewInfo(ClusterInfo info) { ); } - } else if (isNodeExceedingLowWatermark(usage, isWarmNode)) { + } else if (nodeDiskEvaluator.isNodeExceedingLowWatermark(usage, isWarmNode)) { nodesOverHighThresholdAndRelocating.remove(node); @@ -328,7 +331,7 @@ public void onNewInfo(ClusterInfo info) { } boolean isNodeWarm = REMOTE_CAPABLE.equals(getNodePool(routingNode)); - if (isNodeExceedingHighWatermark(usageIncludingRelocations, isNodeWarm)) { + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageIncludingRelocations, isNodeWarm)) { nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId()); logger.warn( @@ -431,21 +434,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste ); } - private boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { - DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; - return evaluator.isNodeExceedingFloodStageWatermark(diskUsage); - } - - private boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { - DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; - return evaluator.isNodeExceedingHighWatermark(diskUsage); - } - - private boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { - DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator; - return evaluator.isNodeExceedingLowWatermark(diskUsage); - } - private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) { double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get(); AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java new file mode 100644 index 0000000000000..36e36e8b86201 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.DiskUsage; + +public class NodeDiskEvaluator { + + private final DiskThresholdEvaluator hotNodeEvaluator; + private final DiskThresholdEvaluator warmNodeEvaluator; + + public NodeDiskEvaluator(DiskThresholdEvaluator hotEvaluator, DiskThresholdEvaluator warmEvaluator) { + this.hotNodeEvaluator = hotEvaluator; + this.warmNodeEvaluator = warmEvaluator; + } + + public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { + return isWarmNode + ? warmNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage) + : hotNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage); + } + + public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { + return isWarmNode + ? warmNodeEvaluator.isNodeExceedingHighWatermark(diskUsage) + : hotNodeEvaluator.isNodeExceedingHighWatermark(diskUsage); + } + + public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { + return isWarmNode + ? warmNodeEvaluator.isNodeExceedingLowWatermark(diskUsage) + : hotNodeEvaluator.isNodeExceedingLowWatermark(diskUsage); + } +} From 4209aabb3dce86b1b70e6ee00fb1716184cded09 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Thu, 12 Jun 2025 12:18:44 +0530 Subject: [PATCH 07/12] small change Signed-off-by: Gagan Singh Saini --- .../routing/allocation/decider/DiskThresholdDeciderIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 3e6b2eb89f5c7..528c3fd62b81a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -136,7 +136,7 @@ public static Collection parameters() { private FileSystem defaultFileSystem; - protected static final String BASE_REMOTE_REPO = "test-rs-repo" + "__rs"; + protected static final String BASE_REMOTE_REPO = "test-rs-repo"; protected Path remoteRepoPath; @Before @@ -397,7 +397,7 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { */ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception { // Skip this test when running with parameters to ensure it only runs once - assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == false); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); From bdd394549b82e8c63616dbd32f5e5e955e80a399 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Thu, 12 Jun 2025 12:21:07 +0530 Subject: [PATCH 08/12] Add Changelog Signed-off-by: Gagan Singh Saini --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c12a0e07d9cec..6f7b745cbc110 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Rule based auto-tagging] Add update rule API ([#17797](https://github.com/opensearch-project/OpenSearch/pull/17797)) - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) - Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082)) +- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375)) - Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988)) - [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128)) - Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088)) From 6e7922c115e1a54271a98cb5236be7c8d0b11829 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Thu, 12 Jun 2025 12:50:38 +0530 Subject: [PATCH 09/12] Spotless fix Signed-off-by: Gagan Singh Saini --- .../decider/DiskThresholdDeciderIT.java | 5 ++- .../routing/allocation/NodeDiskEvaluator.java | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 528c3fd62b81a..912c48d7bbdfe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -397,7 +397,10 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception { */ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception { // Skip this test when running with parameters to ensure it only runs once - assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == false); + assumeTrue( + "Test should only run in the default (non-parameterized) test suite", + WRITABLE_WARM_INDEX_SETTING.get(settings) == false + ); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java index 36e36e8b86201..3f900b04b87a8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java @@ -10,28 +10,60 @@ import org.opensearch.cluster.DiskUsage; +/** + * Evaluates disk usage thresholds for hot and warm nodes in the cluster. + * This class provides methods to check if nodes are exceeding various disk watermark levels + * based on their type (hot or warm) and current disk usage. + */ public class NodeDiskEvaluator { private final DiskThresholdEvaluator hotNodeEvaluator; private final DiskThresholdEvaluator warmNodeEvaluator; + /** + * Constructs a NodeDiskEvaluator with separate evaluators for hot and warm nodes + * + * @param hotEvaluator The evaluator for hot nodes + * @param warmEvaluator The evaluator for warm nodes + */ public NodeDiskEvaluator(DiskThresholdEvaluator hotEvaluator, DiskThresholdEvaluator warmEvaluator) { this.hotNodeEvaluator = hotEvaluator; this.warmNodeEvaluator = warmEvaluator; } + /** + * Checks if a node is exceeding the flood stage watermark based on its type and disk usage + * + * @param diskUsage The current disk usage of the node + * @param isWarmNode Whether the node is a warm node + * @return true if the node is exceeding flood stage watermark, false otherwise + */ public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { return isWarmNode ? warmNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage) : hotNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage); } + /** + * Checks if a node is exceeding the high watermark based on its type and disk usage + * + * @param diskUsage The current disk usage of the node + * @param isWarmNode Whether the node is a warm node + * @return true if the node is exceeding high watermark, false otherwise + */ public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { return isWarmNode ? warmNodeEvaluator.isNodeExceedingHighWatermark(diskUsage) : hotNodeEvaluator.isNodeExceedingHighWatermark(diskUsage); } + /** + * Checks if a node is exceeding the low watermark based on its type and disk usage + * + * @param diskUsage The current disk usage of the node + * @param isWarmNode Whether the node is a warm node + * @return true if the node is exceeding low watermark, false otherwise + */ public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { return isWarmNode ? warmNodeEvaluator.isNodeExceedingLowWatermark(diskUsage) From 326f1ea0ed33b7648db6e8a67f9d9b779ea307a5 Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Mon, 16 Jun 2025 14:07:02 +0530 Subject: [PATCH 10/12] Resolving comments Signed-off-by: Gagan Singh Saini --- .../allocation/DiskThresholdMonitor.java | 19 ++- .../routing/allocation/NodeDiskEvaluator.java | 108 +++++++++++------- 2 files changed, 77 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index c56f5a0df7d81..b66b081e8a317 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -125,12 +125,7 @@ public DiskThresholdMonitor( this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; - DiskThresholdEvaluator hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings); - DiskThresholdEvaluator warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator( - diskThresholdSettings, - dataToFileCacheSizeRatioSupplier - ); - this.nodeDiskEvaluator = new NodeDiskEvaluator(hotNodeEvaluator, warmNodeEvaluator); + this.nodeDiskEvaluator = new NodeDiskEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier); this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; } @@ -185,12 +180,13 @@ public void onNewInfo(ClusterInfo info) { // Only for Dedicated Warm Nodes final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode)); + nodeDiskEvaluator.setNodeType(isWarmNode); if (isWarmNode) { // Create DiskUsage for Warm Nodes based on total Addressable Space usage = getWarmDiskUsage(usage, info, routingNode, state); } - if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -213,7 +209,7 @@ public void onNewInfo(ClusterInfo info) { continue; } - if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usage, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usage)) { if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { @@ -232,7 +228,7 @@ public void onNewInfo(ClusterInfo info) { Math.max(0L, usage.getFreeBytes() - reservedSpace) ); - if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageWithReservedSpace, isWarmNode)) { + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageWithReservedSpace)) { nodesOverLowThreshold.add(node); nodesOverHighThreshold.add(node); @@ -250,7 +246,7 @@ public void onNewInfo(ClusterInfo info) { ); } - } else if (nodeDiskEvaluator.isNodeExceedingLowWatermark(usage, isWarmNode)) { + } else if (nodeDiskEvaluator.isNodeExceedingLowWatermark(usage)) { nodesOverHighThresholdAndRelocating.remove(node); @@ -331,7 +327,8 @@ public void onNewInfo(ClusterInfo info) { } boolean isNodeWarm = REMOTE_CAPABLE.equals(getNodePool(routingNode)); - if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageIncludingRelocations, isNodeWarm)) { + nodeDiskEvaluator.setNodeType(isNodeWarm); + if (nodeDiskEvaluator.isNodeExceedingHighWatermark(usageIncludingRelocations)) { nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId()); logger.warn( diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java index 3f900b04b87a8..9635aaaa557e5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeDiskEvaluator.java @@ -10,63 +10,93 @@ import org.opensearch.cluster.DiskUsage; +import java.util.function.Supplier; + /** * Evaluates disk usage thresholds for hot and warm nodes in the cluster. * This class provides methods to check if nodes are exceeding various disk watermark levels * based on their type (hot or warm) and current disk usage. */ -public class NodeDiskEvaluator { +public class NodeDiskEvaluator implements DiskThresholdEvaluator { + + /** + * Enum representing different node types for disk threshold evaluation + * + * @opensearch.internal + */ + enum NodeType { + HOT, + WARM + } private final DiskThresholdEvaluator hotNodeEvaluator; private final DiskThresholdEvaluator warmNodeEvaluator; + private NodeType nodeType; /** * Constructs a NodeDiskEvaluator with separate evaluators for hot and warm nodes * - * @param hotEvaluator The evaluator for hot nodes - * @param warmEvaluator The evaluator for warm nodes + * @param diskThresholdSettings Disk Threshold Settings + * @param dataToFileCacheSizeRatioSupplier Supplier for remote_data_ratio */ - public NodeDiskEvaluator(DiskThresholdEvaluator hotEvaluator, DiskThresholdEvaluator warmEvaluator) { - this.hotNodeEvaluator = hotEvaluator; - this.warmNodeEvaluator = warmEvaluator; + public NodeDiskEvaluator(DiskThresholdSettings diskThresholdSettings, Supplier dataToFileCacheSizeRatioSupplier) { + this.hotNodeEvaluator = new HotNodeDiskThresholdEvaluator(diskThresholdSettings); + this.warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier); } - /** - * Checks if a node is exceeding the flood stage watermark based on its type and disk usage - * - * @param diskUsage The current disk usage of the node - * @param isWarmNode Whether the node is a warm node - * @return true if the node is exceeding flood stage watermark, false otherwise - */ - public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage, boolean isWarmNode) { - return isWarmNode - ? warmNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage) - : hotNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage); + public void setNodeType(boolean isWarmNode) { + if (isWarmNode) { + nodeType = NodeType.WARM; + } else { + nodeType = NodeType.HOT; + } } - /** - * Checks if a node is exceeding the high watermark based on its type and disk usage - * - * @param diskUsage The current disk usage of the node - * @param isWarmNode Whether the node is a warm node - * @return true if the node is exceeding high watermark, false otherwise - */ - public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage, boolean isWarmNode) { - return isWarmNode - ? warmNodeEvaluator.isNodeExceedingHighWatermark(diskUsage) - : hotNodeEvaluator.isNodeExceedingHighWatermark(diskUsage); + @Override + public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.isNodeExceedingLowWatermark(diskUsage); + } + return warmNodeEvaluator.isNodeExceedingLowWatermark(diskUsage); } - /** - * Checks if a node is exceeding the low watermark based on its type and disk usage - * - * @param diskUsage The current disk usage of the node - * @param isWarmNode Whether the node is a warm node - * @return true if the node is exceeding low watermark, false otherwise - */ - public boolean isNodeExceedingLowWatermark(DiskUsage diskUsage, boolean isWarmNode) { - return isWarmNode - ? warmNodeEvaluator.isNodeExceedingLowWatermark(diskUsage) - : hotNodeEvaluator.isNodeExceedingLowWatermark(diskUsage); + @Override + public boolean isNodeExceedingHighWatermark(DiskUsage diskUsage) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.isNodeExceedingHighWatermark(diskUsage); + } + return warmNodeEvaluator.isNodeExceedingHighWatermark(diskUsage); + } + + @Override + public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage); + } + return warmNodeEvaluator.isNodeExceedingFloodStageWatermark(diskUsage); + } + + @Override + public long getFreeSpaceLowThreshold(long totalSpace) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.getFreeSpaceLowThreshold(totalSpace); + } + return warmNodeEvaluator.getFreeSpaceLowThreshold(totalSpace); + } + + @Override + public long getFreeSpaceHighThreshold(long totalSpace) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.getFreeSpaceHighThreshold(totalSpace); + } + return warmNodeEvaluator.getFreeSpaceHighThreshold(totalSpace); + } + + @Override + public long getFreeSpaceFloodStageThreshold(long totalSpace) { + if (nodeType == NodeType.HOT) { + return hotNodeEvaluator.getFreeSpaceFloodStageThreshold(totalSpace); + } + return warmNodeEvaluator.getFreeSpaceFloodStageThreshold(totalSpace); } } From d42ea62ded607af139457d4004736cd8eb76dccd Mon Sep 17 00:00:00 2001 From: Gagan Singh Saini Date: Wed, 18 Jun 2025 10:44:08 +0530 Subject: [PATCH 11/12] Handle 0 total addressable space value Signed-off-by: Gagan Singh Saini --- .../cluster/routing/allocation/DiskThresholdMonitor.java | 2 +- .../routing/allocation/WarmNodeDiskThresholdEvaluator.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index b66b081e8a317..0b6b96f1f8c0a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -449,7 +449,7 @@ private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, Routin diskUsage.getNodeName(), diskUsage.getPath(), totalAddressableSpace, - totalAddressableSpace - remoteShardSize + Math.max(0, totalAddressableSpace - remoteShardSize) ); return warmDiskUsage; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java index ed38e9c1212ab..93dfc09e6f3f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/WarmNodeDiskThresholdEvaluator.java @@ -47,10 +47,10 @@ public boolean isNodeExceedingFloodStageWatermark(DiskUsage diskUsage) { } private boolean isNodeExceedingWatermark(DiskUsage diskUsage, Function thresholdFunction) { - if (dataToFileCacheSizeRatioSupplier.get() <= 0) { + long totalBytes = diskUsage.getTotalBytes(); + if (dataToFileCacheSizeRatioSupplier.get() <= 0 || totalBytes <= 0) { return false; } - long totalBytes = diskUsage.getTotalBytes(); long freeSpace = diskUsage.getFreeBytes(); long freeSpaceThreshold = thresholdFunction.apply(totalBytes); From 5668c5c630784ad0465f1cbdf801df764d563c15 Mon Sep 17 00:00:00 2001 From: Gagan6164 <32809202+Gagan6164@users.noreply.github.com> Date: Wed, 18 Jun 2025 12:09:17 +0530 Subject: [PATCH 12/12] Update CHANGELOG.md Signed-off-by: Gagan6164 <32809202+Gagan6164@users.noreply.github.com> --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f0964fa665c5..e7a5cfda0a4c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added +- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375)) - Ability to run Code Coverage with Gradle and produce the jacoco reports locally ([#18509](https://github.com/opensearch-project/OpenSearch/issues/18509)) ### Changed