From 9da594e8a81b3b63723be474647457dd7074463e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 15 Dec 2025 12:48:16 -0800 Subject: [PATCH 1/6] Add shard heap usage to ClusterInfo Relates ES-12882 --- .../index/shard/IndexShardIT.java | 64 +++++++++++++++++++ .../elasticsearch/cluster/ClusterInfo.java | 37 ++++++++++- .../cluster/ClusterInfoSimulator.java | 3 + .../cluster/EstimatedHeapUsageCollector.java | 20 +++++- .../cluster/InternalClusterInfoService.java | 24 +++++-- .../cluster/ShardAndIndexHeapUsage.java | 37 +++++++++++ .../shard_heap_usage_in_cluster_info.csv | 1 + .../resources/transport/upper_bounds/9.4.csv | 2 +- .../cluster/ClusterInfoTests.java | 10 +++ ...rnalClusterInfoServiceSchedulingTests.java | 6 ++ .../DesiredBalanceReconcilerTests.java | 14 +--- .../decider/DiskThresholdDeciderTests.java | 1 + .../DiskThresholdDeciderUnitTests.java | 52 +++++---------- .../ReactiveStorageDeciderService.java | 1 + 14 files changed, 214 insertions(+), 58 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java create mode 100644 server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 76d0d25b19e79..fec551197c268 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.ShardAndIndexHeapUsage; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -105,6 +106,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; import static java.util.Collections.emptySet; @@ -304,6 +306,49 @@ public void testHeapUsageEstimateIsPresent() { } } + public void testShardHeapUsagesArePresent() { + // Create some indices so we can later expect a precise number of (random) shard-level heap usage reports. + final int numIndices = randomIntBetween(1, 5); + final int numShards = randomIntBetween(1, 3); + final String indexPrefix = randomIdentifier(); + IntStream.range(0, numIndices).forEach(i -> { + final String indexName = indexPrefix + "_" + i; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build()); + }); + + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + ClusterInfoServiceUtils.refresh(clusterInfoService); + + Map estimatedShardHeapUsages = clusterInfoService.getClusterInfo().getEstimatedShardHeapUsages(); + assertNotNull(estimatedShardHeapUsages); + // No shard heap usage is reported because it is not yet enabled. + assertTrue(estimatedShardHeapUsages.isEmpty()); + + // Enable collection of heap usages for ClusterInfo. + updateClusterSettings( + Settings.builder() + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .build() + ); + + try { + ClusterInfoServiceUtils.refresh(clusterInfoService); + estimatedShardHeapUsages = clusterInfoService.getClusterInfo().getEstimatedShardHeapUsages(); + assertNotNull(estimatedShardHeapUsages); + assertEquals(estimatedShardHeapUsages.size(), numIndices * numShards); + for (var entry : estimatedShardHeapUsages.entrySet()) { + assertThat(entry.getValue().shardHeap(), greaterThanOrEqualTo(0L)); + assertThat(entry.getValue().indexHeap(), greaterThanOrEqualTo(0L)); + } + } finally { + updateClusterSettings( + Settings.builder() + .putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey()) + .build() + ); + } + } + public void testNodeWriteLoadsArePresent() { InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); @@ -970,6 +1015,25 @@ public void collectClusterHeapUsage(ActionListener> listener) .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong())) ); } + + @Override + public void collectShardHeapUsage(ActionListener> listener) { + ActionListener.completeWith( + listener, + () -> plugin.getClusterService() + .state() + .getRoutingNodes() + .stream() + .map(node -> node.started()) + .flatMap(nodeIt -> StreamSupport.stream(nodeIt.spliterator(), false)) + .collect( + Collectors.toUnmodifiableMap( + shardRouting -> shardRouting.shardId(), + shardRouting -> new ShardAndIndexHeapUsage(randomNonNegativeLong(), randomNonNegativeLong()) + ) + ) + ); + } } public static class BogusEstimatedHeapUsagePlugin extends Plugin implements ClusterPlugin { diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index cba8eee1dbc10..393ebfd0aec08 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -60,6 +60,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS private static final TransportVersion MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO = TransportVersion.fromName( "max_heap_size_per_node_in_cluster_info" ); + private static final TransportVersion SHARD_HEAP_USAGE_IN_CLUSTER_INFO = TransportVersion.fromName("shard_heap_usage_in_cluster_info"); private final Map leastAvailableSpaceUsage; private final Map mostAvailableSpaceUsage; @@ -68,6 +69,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS final Map dataPath; final Map reservedSpace; final Map estimatedHeapUsages; + final Map estimatedShardHeapUsages; final Map nodeUsageStatsForThreadPools; final Map shardWriteLoads; // max heap size per node ID @@ -75,7 +77,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS private final Map> shardToNodeIds; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -88,6 +90,7 @@ protected ClusterInfo() { * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path * @param estimatedHeapUsages estimated heap usage broken down by node + * @param estimatedShardHeapUsages estimated heap usage broken down by {@link ShardId} * @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node * @see #shardIdentifierFromRouting * @param maxHeapSizePerNode node id to max heap size @@ -100,6 +103,7 @@ public ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, + Map estimatedShardHeapUsages, Map nodeUsageStatsForThreadPools, Map shardWriteLoads, Map maxHeapSizePerNode @@ -112,6 +116,7 @@ public ClusterInfo( dataPath, reservedSpace, estimatedHeapUsages, + estimatedShardHeapUsages, nodeUsageStatsForThreadPools, shardWriteLoads, maxHeapSizePerNode, @@ -127,6 +132,7 @@ private ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, + Map estimatedShardHeapUsages, Map nodeUsageStatsForThreadPools, Map shardWriteLoads, Map maxHeapSizePerNode, @@ -139,6 +145,7 @@ private ClusterInfo( this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); + this.estimatedShardHeapUsages = Map.copyOf(estimatedShardHeapUsages); this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); this.shardWriteLoads = Map.copyOf(shardWriteLoads); this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode); @@ -172,6 +179,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.maxHeapSizePerNode = Map.of(); } + if (in.getTransportVersion().supports(SHARD_HEAP_USAGE_IN_CLUSTER_INFO)) { + this.estimatedShardHeapUsages = in.readImmutableMap(ShardId::new, ShardAndIndexHeapUsage::new); + } else { + this.estimatedShardHeapUsages = Map.of(); + } this.shardToNodeIds = computeShardToNodeIds(dataPath); } @@ -181,6 +193,7 @@ ClusterInfo updateWith( Map shardSizes, Map reservedSpace, Map estimatedHeapUsages, + Map estimatedShardHeapUsages, Map nodeUsageStatsForThreadPools ) { return new ClusterInfo( @@ -191,6 +204,7 @@ ClusterInfo updateWith( dataPath, reservedSpace, estimatedHeapUsages, + estimatedShardHeapUsages, nodeUsageStatsForThreadPools, shardWriteLoads, maxHeapSizePerNode, @@ -234,6 +248,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().supports(MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO)) { out.writeMap(this.maxHeapSizePerNode, StreamOutput::writeWriteable); } + if (out.getTransportVersion().supports(SHARD_HEAP_USAGE_IN_CLUSTER_INFO)) { + out.writeMap(this.estimatedShardHeapUsages); + } } /** @@ -314,8 +331,9 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads/maxHeapSizePerNode at this stage, - // to avoid committing to API payloads until the features are settled + // NOTE: We don't serialize + // estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads/maxHeapSizePerNode/estimatedShardHeapUsages + // at this stage, to avoid committing to API payloads until the features are settled ); } @@ -330,6 +348,10 @@ public Map getEstimatedHeapUsages() { return estimatedHeapUsages; } + public Map getEstimatedShardHeapUsages() { + return estimatedShardHeapUsages; + } + /** * Returns a map containing thread pool usage stats for each node, keyed by node ID. */ @@ -432,6 +454,7 @@ public boolean equals(Object o) { && dataPath.equals(that.dataPath) && reservedSpace.equals(that.reservedSpace) && estimatedHeapUsages.equals(that.estimatedHeapUsages) + && estimatedShardHeapUsages.equals(that.estimatedShardHeapUsages) && nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools) && shardWriteLoads.equals(that.shardWriteLoads) && maxHeapSizePerNode.equals(that.maxHeapSizePerNode); @@ -447,6 +470,7 @@ public int hashCode() { dataPath, reservedSpace, estimatedHeapUsages, + estimatedShardHeapUsages, nodeUsageStatsForThreadPools, shardWriteLoads, maxHeapSizePerNode @@ -571,6 +595,7 @@ public static class Builder { private Map dataPath = Map.of(); private Map reservedSpace = Map.of(); private Map estimatedHeapUsages = Map.of(); + private Map estimatedShardHeapUsages = Map.of(); private Map nodeUsageStatsForThreadPools = Map.of(); private Map shardWriteLoads = Map.of(); private Map maxHeapSizePerNode = Map.of(); @@ -584,6 +609,7 @@ public ClusterInfo build() { dataPath, reservedSpace, estimatedHeapUsages, + estimatedShardHeapUsages, nodeUsageStatsForThreadPools, shardWriteLoads, maxHeapSizePerNode @@ -625,6 +651,11 @@ public Builder estimatedHeapUsages(Map estimatedHeap return this; } + public Builder estimatedShardHeapUsages(Map estimatedShardHeapUsages) { + this.estimatedShardHeapUsages = estimatedShardHeapUsages; + return this; + } + public Builder nodeUsageStatsForThreadPools(Map nodeUsageStatsForThreadPools) { this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools; return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 68a82f64b8039..56a59b1d753ab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -39,6 +39,7 @@ public class ClusterInfoSimulator { private final Map mostAvailableSpaceUsage; private final CopyOnFirstWriteMap shardSizes; private final Map estimatedHeapUsages; + private final Map estimatedShardHeapUsages; private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator; public ClusterInfoSimulator(RoutingAllocation allocation) { @@ -47,6 +48,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.mostAvailableSpaceUsage = getAdjustedDiskSpace(allocation, allocation.clusterInfo().getNodeMostAvailableDiskUsages()); this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); + this.estimatedShardHeapUsages = allocation.clusterInfo().getEstimatedShardHeapUsages(); this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); } @@ -192,6 +194,7 @@ public ClusterInfo getClusterInfo() { shardSizes.toImmutableMap(), Map.of(), estimatedHeapUsages, + estimatedShardHeapUsages, shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/EstimatedHeapUsageCollector.java b/server/src/main/java/org/elasticsearch/cluster/EstimatedHeapUsageCollector.java index 36f7b95c92c7f..2e798a4fcad89 100644 --- a/server/src/main/java/org/elasticsearch/cluster/EstimatedHeapUsageCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/EstimatedHeapUsageCollector.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.shard.ShardId; import java.util.Map; @@ -25,7 +26,17 @@ public interface EstimatedHeapUsageCollector { /** * This will be used when there is no EstimatedHeapUsageCollector available */ - EstimatedHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of()); + EstimatedHeapUsageCollector EMPTY = new EstimatedHeapUsageCollector() { + @Override + public void collectClusterHeapUsage(ActionListener> listener) { + listener.onResponse(Map.of()); + } + + @Override + public void collectShardHeapUsage(ActionListener> listener) { + listener.onResponse(Map.of()); + } + }; /** * Collect the estimated heap usage for every node in the cluster @@ -33,4 +44,11 @@ public interface EstimatedHeapUsageCollector { * @param listener The listener which will receive the results */ void collectClusterHeapUsage(ActionListener> listener); + + /** + * Collects the estimated heap usage for every shard in the cluster. + * + * @param listener The listener which will receive the results + */ + void collectShardHeapUsage(ActionListener> listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 01db136ed69b3..831cda753716c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -211,6 +211,7 @@ private class AsyncRefresh { private volatile Map mostAvailableSpaceUsages; private volatile Map maxHeapPerNode; private volatile Map estimatedHeapUsagePerNode; + private volatile Map estimatedHeapUsagePerShard; private volatile Map nodeThreadPoolUsageStatsPerNode; private volatile IndicesStatsSummary indicesStatsSummary; @@ -227,7 +228,7 @@ void execute() { try (var ignoredRefs = fetchRefs) { maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled()); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); - maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); + maybeFetchEstimatedHeapUsage(estimatedHeapThresholdEnabled); fetchNodesUsageStatsForThreadPools(); } } @@ -256,14 +257,15 @@ private void maybeFetchNodeStats(boolean shouldFetch) { } } - private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { + private void maybeFetchEstimatedHeapUsage(boolean shouldFetch) { if (shouldFetch) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesEstimatedHeapUsage(); + fetchEstimatedHeapUsage(); } } else { logger.trace("skipping collecting estimated heap usage from cluster, notifying listeners with empty estimated heap usage"); estimatedHeapUsagePerNode = Map.of(); + estimatedHeapUsagePerShard = Map.of(); } } @@ -288,7 +290,7 @@ public void onFailure(Exception e) { } } - private void fetchNodesEstimatedHeapUsage() { + private void fetchEstimatedHeapUsage() { estimatedHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(Map currentEstimatedHeapUsages) { @@ -301,6 +303,19 @@ public void onFailure(Exception e) { estimatedHeapUsagePerNode = Map.of(); } }, fetchRefs.acquire())); + + estimatedHeapUsageCollector.collectShardHeapUsage(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map currentEstimatedHeapUsages) { + estimatedHeapUsagePerShard = currentEstimatedHeapUsages; + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch heap usage for shards", e); + estimatedHeapUsagePerShard = Map.of(); + } + }, fetchRefs.acquire())); } private void fetchIndicesStats() { @@ -488,6 +503,7 @@ private ClusterInfo updateAndGetCurrentClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, + estimatedHeapUsagePerShard, nodeThreadPoolUsageStatsPerNode, indicesStatsSummary.shardWriteLoads(), maxHeapPerNode diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java b/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java new file mode 100644 index 0000000000000..431a647f9ba57 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Tracks a shard's heap usage, as well as any index-level heap usage overhead that should be deduplicated per node. + */ +public record ShardAndIndexHeapUsage(long shardHeap, long indexHeap) implements Writeable { + + public ShardAndIndexHeapUsage(StreamInput in) throws IOException { + this(in.readLong(), in.readLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(this.shardHeap); + out.writeLong(this.indexHeap); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{shardHeap=" + shardHeap + ", indexHeap=" + indexHeap + "}"; + } +} diff --git a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv new file mode 100644 index 0000000000000..806b9c40d5d94 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv @@ -0,0 +1 @@ +9254000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index 074a87dc224e5..719d3af2dd799 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -searchable_snapshots_dlm,9253000 +shard_heap_usage_in_cluster_info,9254000 diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 2a0e74fa6b926..80f1c2fc65e72 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -45,6 +45,7 @@ public static ClusterInfo randomClusterInfo() { randomRoutingToDataPath(), randomReservedSpace(), randomNodeHeapUsage(), + randomShardHeapUsages(), randomNodeUsageStatsForThreadPools(), randomShardWriteLoad(), randomMaxHeapSizes() @@ -69,6 +70,15 @@ private static Map randomMaxHeapSizes() { return nodeMaxHeapSizes; } + private static Map randomShardHeapUsages() { + int numEntries = randomIntBetween(0, 128); + Map shardHeapUsageBuilder = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + shardHeapUsageBuilder.put(randomShardId(), new ShardAndIndexHeapUsage(randomLong(), randomLong())); + } + return shardHeapUsageBuilder; + } + private static Map randomNodeHeapUsage() { int numEntries = randomIntBetween(0, 128); Map nodeHeapUsage = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index d89f932d5b098..8309adb0345a0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ClusterServiceUtils; @@ -224,6 +225,11 @@ private static class StubEstimatedEstimatedHeapUsageCollector implements Estimat public void collectClusterHeapUsage(ActionListener> listener) { listener.onResponse(Map.of()); } + + @Override + public void collectShardHeapUsage(ActionListener> listener) { + listener.onResponse(Map.of()); + } } private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index fb404ecdde8be..ba5e1e0b11ba9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -610,19 +610,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { shardIdentifierFromRouting(clusterState.routingTable().shardRoutingTable("index-existing", 0).primaryShard()), existingShardSize ); - final var clusterInfo = new ClusterInfo( - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - shardSizesBuilder.build(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of() - ); - + final var clusterInfo = ClusterInfo.builder().shardSizes(shardSizesBuilder.build()).build(); final var restoredShardSize = randomNonNegativeLong(); final var snapshotSizesBuilder = ImmutableOpenMap.builder(); snapshotSizesBuilder.put( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 29256dc2dac9f..a76725ecc1ede 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1416,6 +1416,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 37510f0774312..92e802602672b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -102,18 +102,11 @@ public void testCanAllocateUsesMaxAvailableSpace() { // this is weird and smells like a bug! it should be up to 20%? mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, randomIntBetween(0, 10))); - final ClusterInfo clusterInfo = new ClusterInfo( - leastAvailableUsages, - mostAvailableUsage, - Map.of("[test][0][p]", 10L), // 10 bytes, - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of() - ); + final ClusterInfo clusterInfo = ClusterInfo.builder() + .leastAvailableSpaceUsage(leastAvailableUsages) + .mostAvailableSpaceUsage(mostAvailableUsage) + .shardSizes(Map.of("[test][0][p]", 10L)) + .build(); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState, @@ -177,18 +170,11 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo // way bigger than available space final long shardSize = randomLongBetween(totalBytes + 10, totalBytes * 10); - ClusterInfo clusterInfo = new ClusterInfo( - leastAvailableUsages, - mostAvailableUsage, - Map.of("[test][0][p]", shardSize), - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of() - ); + ClusterInfo clusterInfo = ClusterInfo.builder() + .leastAvailableSpaceUsage(leastAvailableUsages) + .mostAvailableSpaceUsage(mostAvailableUsage) + .shardSizes(Map.of("[test][0][p]", shardSize)) + .build(); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState, @@ -326,18 +312,12 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) { shardSizes.put("[test][1][p]", exactFreeSpaceForHighWatermark); shardSizes.put("[test][2][p]", exactFreeSpaceForHighWatermark); - final ClusterInfo clusterInfo = new ClusterInfo( - leastAvailableUsages, - mostAvailableUsage, - shardSizes, - Map.of(), - shardRoutingMap, - Map.of(), - Map.of(), - Map.of(), - Map.of(), - Map.of() - ); + final ClusterInfo clusterInfo = ClusterInfo.builder() + .leastAvailableSpaceUsage(leastAvailableUsages) + .mostAvailableSpaceUsage(mostAvailableUsage) + .shardSizes(shardSizes) + .dataPath(shardRoutingMap) + .build(); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState, diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 89aa01a6a58ad..735075695c6a0 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -961,6 +961,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info; From 6f3270d795143403721ef4b130f0b8d4a9c9d513 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 19 Dec 2025 16:22:31 -0800 Subject: [PATCH 2/6] refactor names to indicate bytes --- .../org/elasticsearch/index/shard/IndexShardIT.java | 4 ++-- .../cluster/ShardAndIndexHeapUsage.java | 13 +++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index fec551197c268..eca295f5fa3ad 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -337,8 +337,8 @@ public void testShardHeapUsagesArePresent() { assertNotNull(estimatedShardHeapUsages); assertEquals(estimatedShardHeapUsages.size(), numIndices * numShards); for (var entry : estimatedShardHeapUsages.entrySet()) { - assertThat(entry.getValue().shardHeap(), greaterThanOrEqualTo(0L)); - assertThat(entry.getValue().indexHeap(), greaterThanOrEqualTo(0L)); + assertThat(entry.getValue().shardHeapUsageBytes(), greaterThanOrEqualTo(0L)); + assertThat(entry.getValue().indexHeapUsageBytes(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java b/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java index 431a647f9ba57..2f52919030cf8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java +++ b/server/src/main/java/org/elasticsearch/cluster/ShardAndIndexHeapUsage.java @@ -18,7 +18,7 @@ /** * Tracks a shard's heap usage, as well as any index-level heap usage overhead that should be deduplicated per node. */ -public record ShardAndIndexHeapUsage(long shardHeap, long indexHeap) implements Writeable { +public record ShardAndIndexHeapUsage(long shardHeapUsageBytes, long indexHeapUsageBytes) implements Writeable { public ShardAndIndexHeapUsage(StreamInput in) throws IOException { this(in.readLong(), in.readLong()); @@ -26,12 +26,17 @@ public ShardAndIndexHeapUsage(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(this.shardHeap); - out.writeLong(this.indexHeap); + out.writeLong(this.shardHeapUsageBytes); + out.writeLong(this.indexHeapUsageBytes); } @Override public String toString() { - return getClass().getSimpleName() + "{shardHeap=" + shardHeap + ", indexHeap=" + indexHeap + "}"; + return getClass().getSimpleName() + + "{shardHeapUsageBytes=" + + shardHeapUsageBytes + + ", indexHeapUsageBytes=" + + indexHeapUsageBytes + + "}"; } } From 1ff495e2b6dae94cff0f11bd36b9f7f5510db07b Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 22 Dec 2025 11:05:27 -0800 Subject: [PATCH 3/6] change to non-negative random longs in testing; add a few checks in InternalClusterInfoServiceSchedulingTests to verify the new method is called --- .../test/java/org/elasticsearch/cluster/ClusterInfoTests.java | 2 +- .../cluster/InternalClusterInfoServiceSchedulingTests.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 80f1c2fc65e72..226611830e207 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -74,7 +74,7 @@ private static Map randomShardHeapUsages() { int numEntries = randomIntBetween(0, 128); Map shardHeapUsageBuilder = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { - shardHeapUsageBuilder.put(randomShardId(), new ShardAndIndexHeapUsage(randomLong(), randomLong())); + shardHeapUsageBuilder.put(randomShardId(), new ShardAndIndexHeapUsage(randomNonNegativeLong(), randomNonNegativeLong())); } return shardHeapUsageBuilder; } diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 8309adb0345a0..384b37054e6e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -154,6 +154,7 @@ public void reroute(String reason, Priority priority, ActionListener liste // should have run two client requests: nodes stats request and indices stats request assertThat(client.requestCount, equalTo(initialRequestCount + 2)); verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should have polled for heap usage + verify(mockEstimatedHeapUsageCollector).collectShardHeapUsage(any()); verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any()); } @@ -202,6 +203,7 @@ public void reroute(String reason, Priority priority, ActionListener liste deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval + verify(mockEstimatedHeapUsageCollector).collectShardHeapUsage(any()); verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any()); } From 3f122a37748bfd842caa0160ad7ae925ff218b19 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 23 Dec 2025 10:31:54 -0800 Subject: [PATCH 4/6] fix transport version --- .../definitions/referable/shard_heap_usage_in_cluster_info.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.4.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv index 806b9c40d5d94..7e23f3c7e5dd4 100644 --- a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv +++ b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv @@ -1 +1 @@ -9254000 +9255000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index 0a5a6afb6a3fe..64cd7907c8cde 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -esql_long_ranges,9254000 +shard_heap_usage_in_cluster_info,9255000 From e56e741897d785e17900f7d5f093f4088070e78a Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 15 Jan 2026 10:29:13 -0800 Subject: [PATCH 5/6] fix transport version --- .../definitions/referable/shard_heap_usage_in_cluster_info.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.4.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv index 7e23f3c7e5dd4..f743cf1474056 100644 --- a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv +++ b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv @@ -1 +1 @@ -9255000 +9261000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index d3954517fe8c3..3b3954cfad1d0 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -get_inference_fields_action_as_indices_action,9260000 +shard_heap_usage_in_cluster_info,9261000 From d4f85acc0a302bcbb140e27ffcfb3007e7dd4614 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 2 Feb 2026 13:48:58 -0800 Subject: [PATCH 6/6] fix after merge --- .../definitions/referable/shard_heap_usage_in_cluster_info.csv | 2 +- .../routing/allocation/decider/DiskThresholdDeciderTests.java | 2 +- .../allocation/decider/DiskThresholdDeciderUnitTests.java | 1 - .../autoscaling/storage/ReactiveStorageDeciderService.java | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv index d853d4ab05ed3..f43c057011643 100644 --- a/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv +++ b/server/src/main/resources/transport/definitions/referable/shard_heap_usage_in_cluster_info.csv @@ -1 +1 @@ -9262000 +9271000 diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 0cd8f765d48df..adb886b40785e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1449,7 +1449,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map.of(), Map.of(), Map.of(), - Map.of() + Map.of(), Set.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 9b25d4363ba22..4948f58bd12e2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -48,7 +48,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Set; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting; diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index f2712f1e6723d..db200d842598e 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -962,7 +962,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), - Map.of() + Map.of(), Set.of() ); this.delegate = info;