Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.WarmDiskThresholdDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -393,6 +394,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SearchReplicaAllocationDecider());
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new WarmDiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING = Setting.boolSetting(
"cluster.routing.allocation.disk.warm_threshold_enabled",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting(
"cluster.routing.allocation.disk.watermark.enable_for_single_data_node",
false,
Setting.Property.NodeScope
);
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = new Setting<>(
"cluster.routing.allocation.disk.watermark.low",
"85%",
Expand Down Expand Up @@ -113,6 +124,7 @@
private volatile boolean includeRelocations;
private volatile boolean createIndexBlockAutoReleaseEnabled;
private volatile boolean enabled;
private volatile boolean warmThresholdEnabled;
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
Expand All @@ -139,13 +151,18 @@
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.warmThresholdEnabled = CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.createIndexBlockAutoReleaseEnabled = CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_WARM_DISK_THRESHOLD_ENABLED_SETTING,
this::setWarmThresholdEnabled
);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE, this::setCreateIndexBlockAutoReleaseEnabled);
}

Expand Down Expand Up @@ -311,6 +328,10 @@
this.enabled = enabled;
}

private void setWarmThresholdEnabled(boolean enabled) {
this.warmThresholdEnabled = enabled;
}

Check warning on line 333 in server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdSettings.java#L332-L333

Added lines #L332 - L333 were not covered by tests

private void setLowWatermark(String lowWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.lowWatermarkRaw = lowWatermark;
Expand Down Expand Up @@ -390,6 +411,10 @@
return enabled;
}

public boolean isWarmThresholdEnabled() {
return warmThresholdEnabled;
}

public TimeValue getRerouteInterval() {
return rerouteInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,24 @@
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.ENABLE_FOR_SINGLE_DATA_NODE;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -101,12 +98,6 @@ public class DiskThresholdDecider extends AllocationDecider {

public static final String NAME = "disk_threshold";

public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting(
"cluster.routing.allocation.disk.watermark.enable_for_single_data_node",
false,
Setting.Property.NodeScope
);

private final DiskThresholdSettings diskThresholdSettings;
private final boolean enableForSingleDataNode;
private final FileCacheSettings fileCacheSettings;
Expand Down Expand Up @@ -176,44 +167,9 @@ public static long sizeOfRelocatingShards(
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
// For this case WarmDiskThresholdDecider Decider will take decision
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
// we don't need to check the ratio
if (dataToFileCacheSizeRatio <= 0.1f) {
return Decision.YES;
}

final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
return Decision.ALWAYS;
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.NO;
}
Expand Down Expand Up @@ -481,12 +437,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
// For this case WarmDiskThresholdDecider Decider will take decision
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.ALWAYS;
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.NO;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
Expand Down
Loading
Loading