diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e2c1ee1a7677..7ec1ef22f6c7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782)) - Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039)) - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) +- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082)) - Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988)) - Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088)) - Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759)) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index baa71cf04e8d6..fc7dae7854ff2 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -81,6 +81,7 @@ import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.WarmDiskThresholdDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.AbstractModule; import org.opensearch.common.settings.ClusterSettings; @@ -393,6 +394,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SearchReplicaAllocationDecider()); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new WarmDiskThresholdDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java index 21d471c829787..d8306079b30c0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -60,6 +60,17 @@ public class DiskThresholdSettings { Setting.Property.Dynamic, Setting.Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING = Setting.boolSetting( + "cluster.routing.allocation.disk.warm_threshold_enabled", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting( + "cluster.routing.allocation.disk.watermark.enable_for_single_data_node", + false, + Setting.Property.NodeScope + ); public static final Setting CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = new Setting<>( "cluster.routing.allocation.disk.watermark.low", "85%", @@ -113,6 +124,7 @@ public class DiskThresholdSettings { private volatile boolean includeRelocations; private volatile boolean createIndexBlockAutoReleaseEnabled; private volatile boolean enabled; + private volatile boolean warmThresholdEnabled; private volatile TimeValue rerouteInterval; private volatile Double freeDiskThresholdFloodStage; private volatile ByteSizeValue freeBytesThresholdFloodStage; @@ -139,6 +151,7 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings); this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings); this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + this.warmThresholdEnabled = CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING.get(settings); this.createIndexBlockAutoReleaseEnabled = CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark); @@ -146,6 +159,10 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING, + this::setWarmThresholdEnabled + ); clusterSettings.addSettingsUpdateConsumer(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, this::setCreateIndexBlockAutoReleaseEnabled); } @@ -311,6 +328,10 @@ private void setEnabled(boolean enabled) { this.enabled = enabled; } + private void setWarmThresholdEnabled(boolean enabled) { + this.warmThresholdEnabled = enabled; + } + private void setLowWatermark(String lowWatermark) { // Watermark is expressed in terms of used data, but we need "free" data watermark this.lowWatermarkRaw = lowWatermark; @@ -390,6 +411,10 @@ public boolean isEnabled() { return enabled; } + public boolean isWarmThresholdEnabled() { + return warmThresholdEnabled; + } + public TimeValue getRerouteInterval() { return rerouteInterval; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 5fc3f282f33f7..b6715e3ecb458 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -48,27 +48,24 @@ import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.remote.filecache.FileCacheSettings; -import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.snapshots.SnapshotShardSizeInfo; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; import static org.opensearch.cluster.routing.RoutingPool.getNodePool; import static org.opensearch.cluster.routing.RoutingPool.getShardPool; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE; /** * The {@link DiskThresholdDecider} checks that the node a shard is potentially @@ -101,12 +98,6 @@ public class DiskThresholdDecider extends AllocationDecider { public static final String NAME = "disk_threshold"; - public static final Setting ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting( - "cluster.routing.allocation.disk.watermark.enable_for_single_data_node", - false, - Setting.Property.NodeScope - ); - private final DiskThresholdSettings diskThresholdSettings; private final boolean enableForSingleDataNode; private final FileCacheSettings fileCacheSettings; @@ -176,44 +167,9 @@ public static long sizeOfRelocatingShards( public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ClusterInfo clusterInfo = allocation.clusterInfo(); - /* - The following block enables allocation for remote shards within safeguard limits of the filecache. - */ + // For this case WarmDiskThresholdDecider Decider will take decision if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { - final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); - // we don't need to check the ratio - if (dataToFileCacheSizeRatio <= 0.1f) { - return Decision.YES; - } - - final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) - .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation))) - .collect(Collectors.toList()); - final long currentNodeRemoteShardSize = remoteShardsOnNode.stream() - .map(ShardRouting::getExpectedShardSize) - .mapToLong(Long::longValue) - .sum(); - - final long shardSize = getExpectedShardSize( - shardRouting, - 0L, - allocation.clusterInfo(), - allocation.snapshotShardSizeInfo(), - allocation.metadata(), - allocation.routingTable() - ); - - final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); - final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; - final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize; - if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) { - return allocation.decision( - Decision.NO, - NAME, - "file cache limit reached - remote shard size will exceed configured safeguard ratio" - ); - } - return Decision.YES; + return Decision.ALWAYS; } else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { return Decision.NO; } @@ -481,12 +437,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]"); } - /* - The following block prevents movement for remote shards since they do not use the local storage as - the primary source of data storage. - */ + // For this case WarmDiskThresholdDecider Decider will take decision if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { return Decision.ALWAYS; + } else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { + return Decision.NO; } final ClusterInfo clusterInfo = allocation.clusterInfo(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java new file mode 100644 index 0000000000000..c5db09fcbd608 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java @@ -0,0 +1,328 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.store.remote.filecache.FileCacheSettings; +import org.opensearch.index.store.remote.filecache.FileCacheStats; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; +import static org.opensearch.cluster.routing.RoutingPool.getNodePool; +import static org.opensearch.cluster.routing.RoutingPool.getShardPool; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE; + +/** + * The {@link WarmDiskThresholdDecider} checks that the node a shard is potentially + * being allocated to has enough remote addressable space (calculated using remote + * data ratio and total file cache size). + *

+ * It has three configurable settings, all of which can be changed dynamically: + *

+ * cluster.routing.allocation.disk.watermark.low is the low disk + * watermark. New shards will not be allocated to a node with remote shard size higher than this + * It defaults to 0.85 (85.0%). + *

+ * cluster.routing.allocation.disk.watermark.high is the high disk + * watermark. If a node has total remote shard size higher than this, shards are not allowed to + * remain on the node. In addition, if allocating a shard to a node causes the + * node to pass this watermark, it will not be allowed. It defaults to + * 0.90 (90.0%). + *

+ * Both watermark settings are expressed in terms of used disk percentage, or + * exact byte values for free space (like "500mb") + *

+ * cluster.routing.allocation.disk.warm_threshold_enabled is used to + * enable or disable this decider. It defaults to true (enabled). + * + * @opensearch.internal + */ +public class WarmDiskThresholdDecider extends AllocationDecider { + + private static final Logger logger = LogManager.getLogger(WarmDiskThresholdDecider.class); + + public static final String NAME = "warm_disk_threshold"; + + private final FileCacheSettings fileCacheSettings; + private final DiskThresholdSettings diskThresholdSettings; + private final boolean enableForSingleDataNode; + + public WarmDiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) { + this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings); + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); + assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9"; + this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + var nodeRoutingPool = getNodePool(node); + var shardRoutingPool = getShardPool(shardRouting, allocation); + if (nodeRoutingPool != REMOTE_CAPABLE || shardRoutingPool != REMOTE_CAPABLE) { + return Decision.ALWAYS; + } + + final Decision decision = earlyTerminate(node, allocation); + if (decision != null) { + return decision; + } + + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shardRouting, + 0L, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + + final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation); + final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false); + final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; + final long freeSpaceAfterAllocation = freeSpace > shardSize ? freeSpace - shardSize : 0; + final long freeSpaceLowThreshold = calculateFreeSpaceLowThreshold(diskThresholdSettings, totalAddressableSpace); + + final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold); + final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); + final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(freeSpaceAfterAllocation); + final ByteSizeValue shardSizeInByteSize = new ByteSizeValue(shardSize); + + if (freeSpaceAfterAllocation < freeSpaceLowThreshold) { + logger.warn( + "after allocating [{}] node [{}] would have less than the required threshold of " + + "{} free (currently {} free, estimated shard size is {}), preventing allocation", + shardRouting, + node.nodeId(), + freeSpaceLowThresholdInByteSize, + freeSpaceInByteSize, + shardSizeInByteSize + ); + return allocation.decision( + Decision.NO, + NAME, + "allocating the shard to this node will bring the node above the low watermark cluster setting [%s=%s] " + + "and cause it to have less than the minimum required [%s] of addressable remote free space (free: [%s], estimated remote shard size: [%s])", + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), + freeSpaceLowThresholdInByteSize, + freeSpaceInByteSize, + shardSizeInByteSize + ); + } + + return allocation.decision( + Decision.YES, + NAME, + "enough available remote addressable space for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]", + freeSpaceInByteSize, + shardSizeInByteSize, + freeSpaceAfterAllocationInByteSize + ); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (shardRouting.currentNodeId().equals(node.nodeId()) == false) { + throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]"); + } + + var nodeRoutingPool = getNodePool(node); + var shardRoutingPool = getShardPool(shardRouting, allocation); + if (nodeRoutingPool != REMOTE_CAPABLE || shardRoutingPool != REMOTE_CAPABLE) { + return Decision.ALWAYS; + } + + final Decision decision = earlyTerminate(node, allocation); + if (decision != null) { + return decision; + } + + final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation); + final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true); + final long freeSpace = totalAddressableSpace - currentNodeRemoteShardSize; + + final long freeSpaceHighThreshold = calculateFreeSpaceHighThreshold(diskThresholdSettings, totalAddressableSpace); + + final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold); + final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace); + + if (freeSpace < freeSpaceHighThreshold) { + logger.warn( + "less than the required {} of free remote addressable space threshold left ({} free) on node [{}], shard cannot remain", + freeSpaceHighThresholdInByteSize, + freeSpaceInByteSize, + node.nodeId() + ); + return allocation.decision( + Decision.NO, + NAME, + "the shard cannot remain on this node because it is above the high watermark cluster setting [%s=%s] " + + "and there is less than the required [%s%%] free remote addressable space on node, actual free: [%s%%]", + CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), + freeSpaceHighThresholdInByteSize, + freeSpaceInByteSize + ); + } + + return allocation.decision( + Decision.YES, + NAME, + "there is enough remote addressable space on this node for the shard to remain, free: [%s]", + freeSpaceInByteSize + ); + } + + private long calculateFreeSpaceLowThreshold(DiskThresholdSettings diskThresholdSettings, long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdLow(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdLow(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + private long calculateFreeSpaceHighThreshold(DiskThresholdSettings diskThresholdSettings, long totalAddressableSpace) { + // Check for percentage-based threshold + double percentageThreshold = diskThresholdSettings.getFreeDiskThresholdHigh(); + if (percentageThreshold > 0) { + return (long) (totalAddressableSpace * percentageThreshold / 100.0); + } + + // Check for absolute bytes threshold + final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); + ByteSizeValue bytesThreshold = diskThresholdSettings.getFreeBytesThresholdHigh(); + if (bytesThreshold != null && bytesThreshold.getBytes() > 0) { + return bytesThreshold.getBytes() * (long) dataToFileCacheSizeRatio; + } + + // Default fallback + return 0; + } + + private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAllocation allocation, boolean subtractLeavingShards) { + final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) + .filter( + shard -> shard.primary() + && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)) + && (subtractLeavingShards == false || shard.relocating() == false) + ) + .collect(Collectors.toList()); + + var remoteShardSize = 0L; + for (ShardRouting shard : remoteShardsOnNode) { + remoteShardSize += DiskThresholdDecider.getExpectedShardSize( + shard, + 0L, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + } + + return remoteShardSize; + } + + private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation allocation) { + ClusterInfo clusterInfo = allocation.clusterInfo(); + // TODO: Change the default value to 5 instead of 0 + final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); + final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); + final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; + return (long) dataToFileCacheSizeRatio * nodeCacheSize; + } + + private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation) { + // Always allow allocation if the decider is disabled + if (diskThresholdSettings.isWarmThresholdEnabled() == false) { + return allocation.decision(Decision.YES, NAME, "the warm disk threshold decider is disabled"); + } + + // Allow allocation regardless if only a single data node is available + if (enableForSingleDataNode == false && allocation.nodes().getDataNodes().size() <= 1) { + if (logger.isTraceEnabled()) { + logger.trace("only a single data node is present, allowing allocation"); + } + return allocation.decision(Decision.YES, NAME, "there is only a single data node present"); + } + + // Fail open there is no info available + final ClusterInfo clusterInfo = allocation.clusterInfo(); + if (clusterInfo == null) { + if (logger.isTraceEnabled()) { + logger.trace("cluster info unavailable for file cache threshold decider, allowing allocation."); + } + return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable"); + } + + // Fail open if there are no file cache stats available + final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); + if (fileCacheStats == null) { + if (logger.isTraceEnabled()) { + logger.trace("unable to get file cache stats for node [{}], allowing allocation", node.nodeId()); + } + return allocation.decision(Decision.YES, NAME, "File Cache Stat is unavailable"); + } + + double remoteDataRatio = fileCacheSettings.getRemoteDataRatio(); + if (remoteDataRatio == 0) { + return allocation.decision(Decision.YES, NAME, "Remote data ratio is set to 0, no limit on allocation"); + } + + return null; + } + +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 54b276237de28..4eb5b599ad79b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -70,7 +70,6 @@ import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider; -import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; @@ -331,11 +330,12 @@ public void apply(Settings value, Settings current, Settings previous) { ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, - DiskThresholdDecider.ENABLE_FOR_SINGLE_DATA_NODE, + DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING, DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 8f888cedd5819..c620b03042007 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -62,6 +62,7 @@ import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.WarmDiskThresholdDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.ModuleTestCase; import org.opensearch.common.settings.ClusterSettings; @@ -253,6 +254,7 @@ public void testAllocationDeciderOrder() { SearchReplicaAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, + WarmDiskThresholdDecider.class, ThrottlingAllocationDecider.class, ShardsLimitAllocationDecider.class, AwarenessAllocationDecider.class, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 2f20c7c09a42b..ceb6a8ec4c087 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -94,7 +94,6 @@ import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; -import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -376,100 +375,6 @@ public void testDiskThresholdForRemoteShards() { assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); } - public void testFileCacheRemoteShardsDecisions() { - Settings diskSettings = Settings.builder() - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%") - .put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5) - .build(); - - // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available - final Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used - usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used - - final Map shardSizes = new HashMap<>(); - shardSizes.put("[test][0][p]", 40L); - shardSizes.put("[test][1][p]", 40L); - shardSizes.put("[foo][0][p]", 10L); - - // First node has filecache size as 0, second has 1000, greater than the shard sizes. - Map fileCacheStatsMap = new HashMap<>(); - fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 0, 0, 0, 0, 0)); - fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); - - final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); - - Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); - defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); - - DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); - Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) - .build(); - - RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - - DiscoveryNode discoveryNode1 = new DiscoveryNode( - "node1", - buildNewFakeTransportAddress(), - emptyMap(), - defaultWithWarmRole, - Version.CURRENT - ); - DiscoveryNode discoveryNode2 = new DiscoveryNode( - "node2", - buildNewFakeTransportAddress(), - emptyMap(), - defaultWithWarmRole, - Version.CURRENT - ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(discoveryNode1).add(discoveryNode2).build(); - - ClusterState baseClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(initialRoutingTable) - .nodes(discoveryNodes) - .build(); - - // Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here - ShardRouting firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED); - ShardRouting secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", null, true, ShardRoutingState.STARTED); - RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting); - RoutingNode secondRoutingNode = new RoutingNode("node2", discoveryNode2); - - RoutingTable.Builder builder = RoutingTable.builder() - .add( - IndexRoutingTable.builder(firstRouting.index()) - .addIndexShard(new IndexShardRoutingTable.Builder(firstRouting.shardId()).addShard(firstRouting).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(secondRouting.shardId()).addShard(secondRouting).build()) - ); - ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - RoutingAllocation routingAllocation = new RoutingAllocation( - null, - new RoutingNodes(clusterState), - clusterState, - clusterInfo, - null, - System.nanoTime() - ); - routingAllocation.debugDecision(true); - Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); - assertThat(decision.type(), equalTo(Decision.Type.YES)); - - decision = diskThresholdDecider.canAllocate(firstRouting, firstRoutingNode, routingAllocation); - assertThat(decision.type(), equalTo(Decision.Type.NO)); - - assertThat( - ((Decision.Single) decision).getExplanation(), - containsString("file cache limit reached - remote shard size will exceed configured safeguard ratio") - ); - - decision = diskThresholdDecider.canAllocate(firstRouting, secondRoutingNode, routingAllocation); - assertThat(decision.type(), equalTo(Decision.Type.YES)); - } - public void testDiskThresholdWithAbsoluteSizes() { Settings diskSettings = Settings.builder() .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) @@ -1365,7 +1270,7 @@ public void testForSingleDataNode() { public void testWatermarksEnabledForSingleDataNode() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.ENABLE_FOR_SINGLE_DATA_NODE.getKey(), true) + .put(DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE.getKey(), true) .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%") diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java new file mode 100644 index 0000000000000..86fe1333715cd --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDeciderTests.java @@ -0,0 +1,441 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.store.remote.filecache.FileCacheSettings; +import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class WarmDiskThresholdDeciderTests extends OpenSearchAllocationTestCase { + + WarmDiskThresholdDecider makeDecider(Settings settings) { + return new WarmDiskThresholdDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + } + + /** + * Tests that the WarmDiskThresholdDecider returns a YES decision when allocating a shard with available space. + */ + public void testCanAllocateSufficientFreeSpace() { + Settings diskSettings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "300b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "200b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100b") + .put(FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5.0) + .build(); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 2000L); // 2000 bytes + shardSizes.put("[test][0][r]", 2000L); + shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes + shardSizes.put("[test2][0][r]", 1000L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final Map usages = new HashMap<>(); + final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders deciders = new AllocationDeciders( + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings))) + ); + + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; + }; + AllocationService strategy = new AllocationService( + deciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + cis, + EmptySnapshotsInfoService.INSTANCE + ); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).addAsNew(metadata.index("test2")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); + + logger.info("--> adding two warm nodes"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithWarmRole)).add(newNode("node2", defaultWithWarmRole))) + .build(); + routingTable = strategy.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + logShardStates(clusterState); + assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(2)); + } + + /** + * Tests that the WarmDiskThresholdDecider returns a NO decision when allocating a shard with insufficient available space. + */ + public void testCanAllocateInSufficientFreeSpace() { + Settings diskSettings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8) + .put(FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5.0) + .build(); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 4000L); // 5000 bytes + shardSizes.put("[test][0][r]", 4000L); + shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes + shardSizes.put("[test2][0][r]", 1000L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final Map usages = new HashMap<>(); + final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders deciders = new AllocationDeciders( + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings))) + ); + + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; + }; + AllocationService strategy = new AllocationService( + deciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + cis, + EmptySnapshotsInfoService.INSTANCE + ); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).addAsNew(metadata.index("test2")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); + + logger.info("--> adding two warm nodes"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithWarmRole)).add(newNode("node2", defaultWithWarmRole))) + .build(); + routingTable = strategy.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + logShardStates(clusterState); + // Assert that test index shards (P and R) are not allocated due to insufficient space + for (ShardRouting shardRouting : clusterState.getRoutingNodes().unassigned()) { + if (shardRouting.index().getName().equals("test")) { + assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED)); + } + } + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(2)); + } + + public void testCanRemainSufficientSpace() { + Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "300b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "200b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100b") + .put(FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5.0) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + WarmDiskThresholdDecider decider = new WarmDiskThresholdDecider(settings, clusterSettings); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 2000L); // 2000 bytes + shardSizes.put("[test][0][r]", 2000L); + shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes + shardSizes.put("[test2][0][r]", 1000L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final Map usages = new HashMap<>(); + final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + // Create a RoutingTable with shards 0 and 1 initialized on node1, and shard 2 unassigned + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // [test][0][P] : STARTED on node1, [test][0][R] : STARTED on node2 + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test").getIndex()); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED)); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node2", null, false, ShardRoutingState.STARTED)); + + // [test2][0][P] : STARTED on node2, [test2][0][R] : STARTED on node1 + IndexRoutingTable.Builder indexRoutingTableBuilder2 = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node1", null, false, ShardRoutingState.STARTED)); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node2", null, true, ShardRoutingState.STARTED)); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + routingTableBuilder.add(indexRoutingTableBuilder2.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithWarmRole)).add(newNode("node2", defaultWithWarmRole))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, clusterInfo, null, 0); + allocation.debugDecision(true); + + ShardRouting shard1 = routingTable.index("test").shard(0).primaryShard(); + ShardRouting shard2 = routingTable.index("test2").shard(0).primaryShard(); + + // Test canRemain decisions + assertEquals(Decision.Type.YES, decider.canRemain(shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(Decision.Type.YES, decider.canRemain(shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + } + + public void testCanRemainInsufficientSpace() { + Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "300b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "250b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100b") + .put(FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5.0) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + WarmDiskThresholdDecider decider = new WarmDiskThresholdDecider(settings, clusterSettings); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 4000L); // 4000 bytes + shardSizes.put("[test][0][r]", 4000L); + shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes + shardSizes.put("[test2][0][r]", 1000L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final Map usages = new HashMap<>(); + final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // [test][0][P]: STARTED on node1, [test][0][R]: STARTED on node2 + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test").getIndex()); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED)); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node2", null, false, ShardRoutingState.STARTED)); + + // [test2][0][P]: STARTED on node2, [test2][0][R]: STARTED on node1 + IndexRoutingTable.Builder indexRoutingTableBuilder2 = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node1", null, false, ShardRoutingState.STARTED)); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node2", null, true, ShardRoutingState.STARTED)); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + routingTableBuilder.add(indexRoutingTableBuilder2.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithWarmRole)).add(newNode("node2", defaultWithWarmRole))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, clusterInfo, null, 0); + allocation.debugDecision(true); + + ShardRouting shard1 = routingTable.index("test").shard(0).primaryShard(); + ShardRouting shard2 = routingTable.index("test2").shard(0).primaryShard(); + + // Test canRemain decisions + assertEquals(Decision.Type.NO, decider.canRemain(shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(Decision.Type.YES, decider.canRemain(shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + } + + public void testCanRemainSufficientSpaceAfterRelocation() { + Settings settings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "300b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "200b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100b") + .put(FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5.0) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + WarmDiskThresholdDecider decider = new WarmDiskThresholdDecider(settings, clusterSettings); + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 3000L); // 4000 bytes + shardSizes.put("[test][0][r]", 3000L); + shardSizes.put("[test2][0][p]", 1000L); // 1000 bytes + shardSizes.put("[test2][0][r]", 1000L); + shardSizes.put("[test3][0][p]", 1500L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final Map usages = new HashMap<>(); + final ClusterInfo clusterInfo = new DiskThresholdDeciderTests.DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test3").settings(warmIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // [test][0][P]: STARTED on node1, [test][0][R]: STARTED on node2 + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test").getIndex()); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED)); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node2", null, false, ShardRoutingState.STARTED)); + + // [test2][0][P]: STARTED on node2, [test2][0][R]: STARTED on node1 + IndexRoutingTable.Builder indexRoutingTableBuilder2 = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node1", null, false, ShardRoutingState.STARTED)); + indexRoutingTableBuilder2.addShard(TestShardRouting.newShardRouting("test2", 0, "node2", null, true, ShardRoutingState.STARTED)); + + // [test3][0][P]: RELOCATING from node1 + IndexRoutingTable.Builder indexRoutingTableBuilder3 = IndexRoutingTable.builder(metadata.index("test3").getIndex()); + indexRoutingTableBuilder3.addShard( + TestShardRouting.newShardRouting("test3", 0, "node1", "node2", true, ShardRoutingState.RELOCATING) + ); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + routingTableBuilder.add(indexRoutingTableBuilder2.build()); + routingTableBuilder.add(indexRoutingTableBuilder3.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + Set defaultWithWarmRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithWarmRole.add(DiscoveryNodeRole.WARM_ROLE); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithWarmRole)).add(newNode("node2", defaultWithWarmRole))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, clusterInfo, null, 0); + allocation.debugDecision(true); + + ShardRouting shard1 = routingTable.index("test").shard(0).primaryShard(); + ShardRouting shard2 = routingTable.index("test2").shard(0).primaryShard(); + + // Test canRemain decisions with [test3][0][p] relocating from node1 + assertEquals(Decision.Type.YES, decider.canRemain(shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(Decision.Type.YES, decider.canRemain(shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + } + + public void logShardStates(ClusterState state) { + RoutingNodes rn = state.getRoutingNodes(); + logger.info( + "--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}", + rn.shards(shard -> true).size(), + rn.shardsWithState(ShardRoutingState.UNASSIGNED).size(), + rn.shardsWithState(ShardRoutingState.INITIALIZING).size(), + rn.shardsWithState(ShardRoutingState.RELOCATING).size(), + rn.shardsWithState(ShardRoutingState.STARTED).size() + ); + logger.info( + "--> unassigned: {}, initializing: {}, relocating: {}, started: {}", + rn.shardsWithState(ShardRoutingState.UNASSIGNED), + rn.shardsWithState(ShardRoutingState.INITIALIZING), + rn.shardsWithState(ShardRoutingState.RELOCATING), + rn.shardsWithState(ShardRoutingState.STARTED) + ); + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index 2525aef903298..a449058014e54 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -1338,6 +1338,14 @@ public static Settings.Builder remoteIndexSettings(Version version) { return builder; } + public static Settings.Builder warmIndexSettings(Version version) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, version) + .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()); + return builder; + } + /** * Returns size random values */