Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d60021
Add nodeHeapUsage field to ClusterInfo
nicktindall Jun 2, 2025
542c256
Populate nodesHeapUsage, make HeapUsageSupplier pluggable
nicktindall Jun 2, 2025
356beb5
Fix tests
nicktindall Jun 2, 2025
81fd063
Allow deferred creation of HeapUsageSupplier
nicktindall Jun 3, 2025
bc0682c
Default HeapUsageSupplier
nicktindall Jun 3, 2025
747b5a2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 3, 2025
13d1de8
Clarify that heap usage is a minimum
nicktindall Jun 3, 2025
f4d9db5
Test that InternalClusterInfoService polls for heap usage
nicktindall Jun 3, 2025
bf51e85
Test that getNodesHeapUsage returns heap usage
nicktindall Jun 3, 2025
c47c0ca
More caveats for #getNodesHeapUsage()
nicktindall Jun 3, 2025
23eb8e6
Remove HeapUsageSupplier from ClusterPlugin interface
nicktindall Jun 4, 2025
887bcaf
Swap free for used in HeapUsage
nicktindall Jun 4, 2025
7275acb
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
85fd019
Don't report heap usage in ClusterInfo serialization
nicktindall Jun 4, 2025
f112a3b
Fix tests
nicktindall Jun 4, 2025
3a1ada2
Only skip disk usage fetches when disk usage is disabled
nicktindall Jun 4, 2025
8fa587f
HeapUsage -> ShardHeapUsage
nicktindall Jun 4, 2025
6d4b204
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
2c42a82
icis -> internalClusterInfoService
nicktindall Jun 4, 2025
58402bd
diskUsage -> shardHeapUsage
nicktindall Jun 4, 2025
63bbea8
Note about not serializing shardHeapUsages
nicktindall Jun 4, 2025
0cacdc7
Remove unused serialization interface/methods
nicktindall Jun 4, 2025
dd73d37
Additional assertions
nicktindall Jun 4, 2025
765ade8
Clear shardHeapUsages on failure to fetch
nicktindall Jun 4, 2025
e26b62f
Fix naming
nicktindall Jun 4, 2025
55637b6
Restore + test percentage methods
nicktindall Jun 5, 2025
f4b90b5
Load ShardHeapUsageSupplier via SPI
nicktindall Jun 5, 2025
0789fef
Move SPI config to internalClusterTest
nicktindall Jun 5, 2025
2d475c8
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 5, 2025
f56f00e
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 6, 2025
7b5bf95
*Supplier -> *Collector
nicktindall Jun 7, 2025
08a5ca3
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 7, 2025
09ca9dc
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 8, 2025
cd6b7e9
Don't assert estimate <= max heap
nicktindall Jun 10, 2025
5e2cb9f
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 10, 2025
c529194
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 10, 2025
d831194
Use node stats to retrieve max heap size
nicktindall Jun 11, 2025
b8387bb
[CI] Auto commit changes from spotless
Jun 11, 2025
26dba4d
Fix build
nicktindall Jun 11, 2025
f15fca2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.HeapUsage;
import org.elasticsearch.cluster.HeapUsageSupplier;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock;
Expand All @@ -81,6 +84,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -89,6 +93,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
Expand All @@ -110,12 +115,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class);
return pluginList(InternalSettingsPlugin.class, BogusHeapUsagePlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -253,6 +259,20 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertThat(dataSetSize.get(), greaterThan(0L));
}

public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Map<String, HeapUsage> heapUsages = clusterInfoService.getClusterInfo().getNodesHeapUsage();
assertNotNull(heapUsages);
assertEquals(state.nodes().size(), heapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(heapUsages.containsKey(node.getId()));
HeapUsage heapUsage = heapUsages.get(node.getId());
assertThat(heapUsage.freeBytes(), lessThanOrEqualTo(heapUsage.totalBytes()));
}
}

public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
Expand Down Expand Up @@ -795,4 +815,37 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
}
}

private static class BogusHeapUsageSupplier implements HeapUsageSupplier {

private final ClusterService clusterService;

private BogusHeapUsageSupplier(ClusterService clusterService) {
this.clusterService = clusterService;
}

@Override
public void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener) {
ActionListener.completeWith(
listener,
() -> clusterService.state().nodes().stream().collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> {
final long maxHeap = randomNonNegativeLong();
final long usedHeap = (long) (randomFloat() * maxHeap);
return new HeapUsage(node.getId(), node.getName(), maxHeap, usedHeap);
}))
);
}
}

public static class BogusHeapUsagePlugin extends Plugin implements ClusterPlugin {

public BogusHeapUsagePlugin() {}

@Override
public Collection<?> createComponents(PluginServices services) {
BogusHeapUsageSupplier bogusHeapUsageSupplier = new BogusHeapUsageSupplier(services.clusterService());
services.allocationService().setHeapUsageSupplier(bogusHeapUsageSupplier);
return List.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ static TransportVersion def(int id) {
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES = def(9_087_0_00);
public static final TransportVersion JOIN_ON_ALIASES = def(9_088_0_00);
public static final TransportVersion ILM_ADD_SKIP_SETTING = def(9_089_0_00);
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_090_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
27 changes: 25 additions & 2 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, HeapUsage> nodesHeapUsage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comment to say this field is deliberately ignored in toXContentChunked so that another reader knows this is intentional and not a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 63bbea8


protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -71,6 +72,7 @@ protected ClusterInfo() {
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param nodesHeapUsage heap usage broken down by node
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -79,14 +81,16 @@ public ClusterInfo(
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, HeapUsage> nodesHeapUsage
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
this.shardSizes = Map.copyOf(shardSizes);
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.nodesHeapUsage = Map.copyOf(nodesHeapUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -98,6 +102,11 @@ public ClusterInfo(StreamInput in) throws IOException {
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.nodesHeapUsage = in.readImmutableMap(HeapUsage::new);
} else {
this.nodesHeapUsage = Map.of();
}
}

@Override
Expand All @@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
}
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.nodesHeapUsage, StreamOutput::writeWriteable);
}
}

/**
Expand Down Expand Up @@ -195,6 +207,17 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
);
}

/**
* Returns a node id to estimated heap usage mapping for all nodes that we have such data for.
* Note that these estimates should be considered minimums. They may be used to determine whether
* there IS NOT capacity to do something, but not to determine that there IS capacity to do something.
* Also note that the map may not be complete, it may contain none, or a subset of the nodes in
* the cluster at any time. It may also contain entries for nodes that have since left the cluster.
*/
public Map<String, HeapUsage> getNodesHeapUsage() {
return nodesHeapUsage;
}

/**
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public ClusterInfo getClusterInfo() {
shardSizes.toImmutableMap(),
shardDataSetSizes,
dataPath,
Map.of(),
Map.of()
);
}
Expand Down
74 changes: 74 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/HeapUsage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Record representing the heap usage for a single cluster node
*/
public record HeapUsage(String nodeId, String nodeName, long totalBytes, long usedBytes) implements ToXContentFragment, Writeable {

public HeapUsage {
assert usedBytes <= totalBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe also assert non-negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in dd73d37

}

public HeapUsage(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeString(this.nodeName);
out.writeVLong(this.totalBytes);
out.writeVLong(this.usedBytes);
}

public XContentBuilder toShortXContent(XContentBuilder builder) throws IOException {
builder.field("node_name", this.nodeName);
builder.humanReadableField("total_heap_bytes", "total", ByteSizeValue.ofBytes(this.totalBytes));
builder.humanReadableField("used_heap_bytes", "used", ByteSizeValue.ofBytes(this.usedBytes));
builder.humanReadableField("free_heap_bytes", "free", ByteSizeValue.ofBytes(this.freeBytes()));
builder.field("free_heap_percent", truncatePercent(this.freeHeapAsPercentage()));
builder.field("used_heap_percent", truncatePercent(this.usedHeapAsPercentage()));
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", this.nodeId);
toShortXContent(builder);
return builder;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are no longer used right? Maybe we can remove them as well as not implementing the ToXContentFragment interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 0cacdc7


public double freeHeapAsPercentage() {
return 100.0 - usedHeapAsPercentage();
}

public double usedHeapAsPercentage() {
return 100.0 * usedBytes / (double) totalBytes;
}

public long freeBytes() {
return totalBytes - usedBytes;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we have tests for these methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 55637b6


private static double truncatePercent(double pct) {
return Math.round(pct * 10.0) / 10.0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;

import java.util.Map;

public interface HeapUsageSupplier {

/**
* This will be used when there are no heap usage suppliers available
*/
HeapUsageSupplier EMPTY = listener -> listener.onResponse(Map.of());

/**
* Get the heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile IndicesStatsSummary indicesStatsSummary;
private volatile Map<String, HeapUsage> nodesHeapUsage;

private final ThreadPool threadPool;
private final Client client;
Expand All @@ -97,13 +98,15 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private final Object mutex = new Object();
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();

private HeapUsageSupplier heapUsageSupplier = HeapUsageSupplier.EMPTY;
private AsyncRefresh currentRefresh;
private RefreshScheduler refreshScheduler;

@SuppressWarnings("this-escape")
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodesHeapUsage = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -131,6 +134,16 @@ void setUpdateFrequency(TimeValue updateFrequency) {
this.updateFrequency = updateFrequency;
}

/**
* This can be provided by plugins, which are initialised long after the ClusterInfoService is created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be provided by plugins, shouldn't it be backed by JvmInfo/JvmStats?

Copy link
Contributor Author

@nicktindall nicktindall Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heap usage supplier supplies estimated heap usage for the whole cluster, the single implementation we have of this exists only in Serverless at the moment, we're using the heap usage estimates provided by MemoryMetricsService which are aggregated on the master using transport layer messages.

The heap usage in this case is not the JVM-provided heap usage but the estimate for the shards that are allocated to a node and the merges that are running on a node (i.e. not "real" heap usage). The estimate is the same one we use for autoscaling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's why it needed to be pluggable, because at this stage it only exists in serverless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heap usage in this case is not the JVM-provided heap usage but the estimate for the shards that are allocated to a node

Let's call it something other than "heap usage" then? Overloading terms increase complexity of understanding the codebase, and "heap usage" already has a well defined meaning.

I would expect this to use a term that includes "shard" since it is shard level data.

*
* @param heapUsageSupplier The HeapUsageSupplier to use
*/
public void setHeapUsageSupplier(HeapUsageSupplier heapUsageSupplier) {
assert this.heapUsageSupplier == HeapUsageSupplier.EMPTY;
this.heapUsageSupplier = heapUsageSupplier;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
final Runnable newRefresh;
Expand Down Expand Up @@ -173,6 +186,7 @@ void execute() {
logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info");
leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of();
nodesHeapUsage = Map.of();
indicesStatsSummary = IndicesStatsSummary.EMPTY;
callListeners();
return;
Expand All @@ -187,9 +201,26 @@ void execute() {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats();
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesHeapUsage();
}
}
}

private void fetchNodesHeapUsage() {
heapUsageSupplier.getClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<String, HeapUsage> stringHeapUsageMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit:

Suggested change
public void onResponse(Map<String, HeapUsage> stringHeapUsageMap) {
public void onResponse(Map<String, HeapUsage> currentHeapUsageMap) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in e26b62f

nodesHeapUsage = stringHeapUsageMap;
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to fetch heap usage for nodes", e);
}
}, fetchRefs.acquire()));
}

private void fetchIndicesStats() {
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
Expand Down Expand Up @@ -413,7 +444,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardDataSetSizes,
indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodesHeapUsage
);
}

Expand Down
Loading