Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.ShardAndIndexHeapUsage;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -105,6 +106,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -304,6 +306,49 @@ public void testHeapUsageEstimateIsPresent() {
}
}

public void testShardHeapUsagesArePresent() {
// Create some indices so we can later expect a precise number of (random) shard-level heap usage reports.
final int numIndices = randomIntBetween(1, 5);
final int numShards = randomIntBetween(1, 3);
final String indexPrefix = randomIdentifier();
IntStream.range(0, numIndices).forEach(i -> {
final String indexName = indexPrefix + "_" + i;
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build());
});

InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);

Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages = clusterInfoService.getClusterInfo().getEstimatedShardHeapUsages();
assertNotNull(estimatedShardHeapUsages);
// No shard heap usage is reported because it is not yet enabled.
assertTrue(estimatedShardHeapUsages.isEmpty());

// Enable collection of heap usages for ClusterInfo.
updateClusterSettings(
Settings.builder()
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.build()
);

try {
ClusterInfoServiceUtils.refresh(clusterInfoService);
estimatedShardHeapUsages = clusterInfoService.getClusterInfo().getEstimatedShardHeapUsages();
assertNotNull(estimatedShardHeapUsages);
assertEquals(estimatedShardHeapUsages.size(), numIndices * numShards);
for (var entry : estimatedShardHeapUsages.entrySet()) {
assertThat(entry.getValue().shardHeapUsageBytes(), greaterThanOrEqualTo(0L));
assertThat(entry.getValue().indexHeapUsageBytes(), greaterThanOrEqualTo(0L));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only have to put a single document in each index and we'd get a non-zero value here wouldn't we? (because of the mapping size in bytes at least)

Then we could assert greaterThan(0L) which seems stronger?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain what you mean. The heap values are randomly generated for this test, anywhere from 0 to Long.MAX_VALUE. So even if some writes were done, that would not guarantee a non-zero value?

Stateful doesn't have real metrics for heap usage, that's only in serverless.

} finally {
updateClusterSettings(
Settings.builder()
.putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey())
.build()
);
}
}

public void testNodeWriteLoadsArePresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);

Expand Down Expand Up @@ -970,6 +1015,25 @@ public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener)
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong()))
);
}

@Override
public void collectShardHeapUsage(ActionListener<Map<ShardId, ShardAndIndexHeapUsage>> listener) {
ActionListener.completeWith(
listener,
() -> plugin.getClusterService()
.state()
.getRoutingNodes()
.stream()
.map(node -> node.started())
.flatMap(nodeIt -> StreamSupport.stream(nodeIt.spliterator(), false))
.collect(
Collectors.toUnmodifiableMap(
shardRouting -> shardRouting.shardId(),
shardRouting -> new ShardAndIndexHeapUsage(randomNonNegativeLong(), randomNonNegativeLong())
)
)
);
}
}

public static class BogusEstimatedHeapUsagePlugin extends Plugin implements ClusterPlugin {
Expand Down
35 changes: 33 additions & 2 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS
private static final TransportVersion NODES_WRITE_LOAD_HOTSPOTTING_IN_CLUSTER_INFO = TransportVersion.fromName(
"nodes_write_load_hotspotting_in_cluster_info"
);
private static final TransportVersion SHARD_HEAP_USAGE_IN_CLUSTER_INFO = TransportVersion.fromName("shard_heap_usage_in_cluster_info");

private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
Expand All @@ -71,6 +72,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
final Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages;
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
final Map<ShardId, Double> shardWriteLoads;
// max heap size per node ID
Expand All @@ -79,7 +81,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardS
private final Map<ShardId, Set<String>> shardToNodeIds;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Set.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Set.of());
}

/**
Expand All @@ -92,6 +94,7 @@ protected ClusterInfo() {
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param estimatedHeapUsages estimated heap usage broken down by node
* @param estimatedShardHeapUsages estimated heap usage broken down by {@link ShardId}
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
* @see #shardIdentifierFromRouting
* @param maxHeapSizePerNode node id to max heap size
Expand All @@ -105,6 +108,7 @@ public ClusterInfo(
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
Map<ShardId, Double> shardWriteLoads,
Map<String, ByteSizeValue> maxHeapSizePerNode,
Expand All @@ -118,6 +122,7 @@ public ClusterInfo(
dataPath,
reservedSpace,
estimatedHeapUsages,
estimatedShardHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads,
maxHeapSizePerNode,
Expand All @@ -134,6 +139,7 @@ private ClusterInfo(
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
Map<ShardId, Double> shardWriteLoads,
Map<String, ByteSizeValue> maxHeapSizePerNode,
Expand All @@ -147,6 +153,7 @@ private ClusterInfo(
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
this.estimatedShardHeapUsages = Map.copyOf(estimatedShardHeapUsages);
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode);
Expand Down Expand Up @@ -186,6 +193,11 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeIdsWriteLoadHotspotting = Set.of();
}
if (in.getTransportVersion().supports(SHARD_HEAP_USAGE_IN_CLUSTER_INFO)) {
this.estimatedShardHeapUsages = in.readImmutableMap(ShardId::new, ShardAndIndexHeapUsage::new);
} else {
this.estimatedShardHeapUsages = Map.of();
}
this.shardToNodeIds = computeShardToNodeIds(dataPath);
}

Expand All @@ -195,6 +207,7 @@ ClusterInfo updateWith(
Map<String, Long> shardSizes,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
) {
return new ClusterInfo(
Expand All @@ -205,6 +218,7 @@ ClusterInfo updateWith(
dataPath,
reservedSpace,
estimatedHeapUsages,
estimatedShardHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads,
maxHeapSizePerNode,
Expand Down Expand Up @@ -253,6 +267,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(NODES_WRITE_LOAD_HOTSPOTTING_IN_CLUSTER_INFO)) {
out.writeStringCollection(this.nodeIdsWriteLoadHotspotting);
}
if (out.getTransportVersion().supports(SHARD_HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.estimatedShardHeapUsages);
}
}

/**
Expand Down Expand Up @@ -334,7 +351,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}),
endArray() // end "reserved_sizes"
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads/maxHeapSizePerNode/
// nodeIdsWriteLoadHotspotting at this stage, to avoid committing to API payloads until the features are settled
// nodeIdsWriteLoadHotspotting/estimatedShardHeapUsages at this stage, to avoid committing to API payloads until
// the features are settled
);
}

Expand All @@ -349,6 +367,10 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
return estimatedHeapUsages;
}

public Map<ShardId, ShardAndIndexHeapUsage> getEstimatedShardHeapUsages() {
return estimatedShardHeapUsages;
}

/**
* Returns a map containing thread pool usage stats for each node, keyed by node ID.
*/
Expand Down Expand Up @@ -455,6 +477,7 @@ public boolean equals(Object o) {
&& dataPath.equals(that.dataPath)
&& reservedSpace.equals(that.reservedSpace)
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
&& estimatedShardHeapUsages.equals(that.estimatedShardHeapUsages)
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
&& shardWriteLoads.equals(that.shardWriteLoads)
&& maxHeapSizePerNode.equals(that.maxHeapSizePerNode)
Expand All @@ -471,6 +494,7 @@ public int hashCode() {
dataPath,
reservedSpace,
estimatedHeapUsages,
estimatedShardHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads,
maxHeapSizePerNode,
Expand Down Expand Up @@ -596,6 +620,7 @@ public static class Builder {
private Map<NodeAndShard, String> dataPath = Map.of();
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
private Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages = Map.of();
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
private Map<ShardId, Double> shardWriteLoads = Map.of();
private Map<String, ByteSizeValue> maxHeapSizePerNode = Map.of();
Expand All @@ -610,6 +635,7 @@ public ClusterInfo build() {
dataPath,
reservedSpace,
estimatedHeapUsages,
estimatedShardHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads,
maxHeapSizePerNode,
Expand Down Expand Up @@ -652,6 +678,11 @@ public Builder estimatedHeapUsages(Map<String, EstimatedHeapUsage> estimatedHeap
return this;
}

public Builder estimatedShardHeapUsages(Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages) {
this.estimatedShardHeapUsages = estimatedShardHeapUsages;
return this;
}

public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools) {
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ClusterInfoSimulator {
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
private final CopyOnFirstWriteMap<String, Long> shardSizes;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
private final Map<ShardId, ShardAndIndexHeapUsage> estimatedShardHeapUsages;
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;

public ClusterInfoSimulator(RoutingAllocation allocation) {
Expand All @@ -47,6 +48,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.mostAvailableSpaceUsage = getAdjustedDiskSpace(allocation, allocation.clusterInfo().getNodeMostAvailableDiskUsages());
this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes);
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
this.estimatedShardHeapUsages = allocation.clusterInfo().getEstimatedShardHeapUsages();
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
}

Expand Down Expand Up @@ -192,6 +194,7 @@ public ClusterInfo getClusterInfo() {
shardSizes.toImmutableMap(),
Map.of(),
estimatedHeapUsages,
estimatedShardHeapUsages,
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.shard.ShardId;

import java.util.Map;

Expand All @@ -25,12 +26,29 @@ public interface EstimatedHeapUsageCollector {
/**
* This will be used when there is no EstimatedHeapUsageCollector available
*/
EstimatedHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of());
EstimatedHeapUsageCollector EMPTY = new EstimatedHeapUsageCollector() {
@Override
public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) {
listener.onResponse(Map.of());
}

@Override
public void collectShardHeapUsage(ActionListener<Map<ShardId, ShardAndIndexHeapUsage>> listener) {
listener.onResponse(Map.of());
}
};

/**
* Collect the estimated heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener);

/**
* Collects the estimated heap usage for every shard in the cluster.
*
* @param listener The listener which will receive the results
*/
void collectShardHeapUsage(ActionListener<Map<ShardId, ShardAndIndexHeapUsage>> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private class AsyncRefresh {
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
private volatile Map<String, Long> estimatedHeapUsagePerNode;
private volatile Map<ShardId, ShardAndIndexHeapUsage> estimatedHeapUsagePerShard;
private volatile Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStatsPerNode;
private volatile IndicesStatsSummary indicesStatsSummary;

Expand All @@ -231,7 +232,7 @@ void execute() {
try (var ignoredRefs = fetchRefs) {
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled());
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
maybeFetchEstimatedHeapUsage(estimatedHeapThresholdEnabled);
fetchNodesUsageStatsForThreadPools();
}
}
Expand Down Expand Up @@ -260,14 +261,15 @@ private void maybeFetchNodeStats(boolean shouldFetch) {
}
}

private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) {
private void maybeFetchEstimatedHeapUsage(boolean shouldFetch) {
if (shouldFetch) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesEstimatedHeapUsage();
fetchEstimatedHeapUsage();
}
} else {
logger.trace("skipping collecting estimated heap usage from cluster, notifying listeners with empty estimated heap usage");
estimatedHeapUsagePerNode = Map.of();
estimatedHeapUsagePerShard = Map.of();
}
}

Expand All @@ -292,7 +294,7 @@ public void onFailure(Exception e) {
}
}

private void fetchNodesEstimatedHeapUsage() {
private void fetchEstimatedHeapUsage() {
estimatedHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<String, Long> currentEstimatedHeapUsages) {
Expand All @@ -305,6 +307,19 @@ public void onFailure(Exception e) {
estimatedHeapUsagePerNode = Map.of();
}
}, fetchRefs.acquire()));

estimatedHeapUsageCollector.collectShardHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<ShardId, ShardAndIndexHeapUsage> currentEstimatedHeapUsages) {
estimatedHeapUsagePerShard = currentEstimatedHeapUsages;
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to fetch heap usage for shards", e);
estimatedHeapUsagePerShard = Map.of();
}
}, fetchRefs.acquire()));
}

private void fetchIndicesStats() {
Expand Down Expand Up @@ -497,6 +512,7 @@ private ClusterInfo updateAndGetCurrentClusterInfo() {
indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace,
estimatedHeapUsages,
estimatedHeapUsagePerShard,
nodeThreadPoolUsageStatsPerNode,
indicesStatsSummary.shardWriteLoads(),
maxHeapPerNode,
Expand Down
Loading