-
Notifications
You must be signed in to change notification settings - Fork 25.6k
simulate disk usage during balance calculation #90061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
5c634a7
342ba59
d0fa4c6
105fcef
7293edd
779016b
5c919f9
103610a
5aaad09
cb7a174
5faf34e
a8072ca
2985353
07be67f
480ad7a
ee9a19f
8c92948
2f1e6fb
67ee7dd
52fa7d3
93a72dc
b64afdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,10 +40,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable { | |
|
|
||
| private final Map<String, DiskUsage> leastAvailableSpaceUsage; | ||
| private final Map<String, DiskUsage> mostAvailableSpaceUsage; | ||
| final Map<String, Long> shardSizes; | ||
| final Map<ShardId, Long> shardDataSetSizes; | ||
| final Map<ShardRouting, String> routingToDataPath; | ||
| final Map<NodeAndPath, ReservedSpace> reservedSpace; | ||
| public final Map<String, Long> shardSizes; | ||
| public final Map<ShardId, Long> shardDataSetSizes; | ||
| public final Map<ShardRouting, String> routingToDataPath; | ||
| public final Map<NodeAndPath, ReservedSpace> reservedSpace; | ||
|
|
||
| protected ClusterInfo() { | ||
| this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
|
|
@@ -224,7 +224,36 @@ public ReservedSpace getReservedSpace(String nodeId, String dataPath) { | |
| * includes a 'p' or 'r' depending on whether the shard is a primary. | ||
| */ | ||
| public static String shardIdentifierFromRouting(ShardRouting shardRouting) { | ||
| return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]"; | ||
| return shardIdentifierFromRouting(shardRouting.shardId(), shardRouting.primary()); | ||
| } | ||
|
|
||
| public static String shardIdentifierFromRouting(ShardId shardId, boolean primary) { | ||
| return shardId.toString() + "[" + (primary ? "p" : "r") + "]"; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the change to equals/hashCode and their tests be ported to main separately?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that'd be good. |
||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| ClusterInfo that = (ClusterInfo) o; | ||
| return leastAvailableSpaceUsage.equals(that.leastAvailableSpaceUsage) | ||
| && mostAvailableSpaceUsage.equals(that.mostAvailableSpaceUsage) | ||
| && shardSizes.equals(that.shardSizes) | ||
| && shardDataSetSizes.equals(that.shardDataSetSizes) | ||
| && routingToDataPath.equals(that.routingToDataPath) | ||
| && reservedSpace.equals(that.reservedSpace); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash( | ||
| leastAvailableSpaceUsage, | ||
| mostAvailableSpaceUsage, | ||
| shardSizes, | ||
| shardDataSetSizes, | ||
| routingToDataPath, | ||
| reservedSpace | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
| * in compliance with, at your election, the Elastic License 2.0 or the Server | ||
| * Side Public License, v 1. | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation.allocator; | ||
|
|
||
| import org.elasticsearch.cluster.ClusterInfo; | ||
| import org.elasticsearch.cluster.DiskUsage; | ||
| import org.elasticsearch.cluster.routing.ShardRouting; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| public class ClusterInfoSimulator { | ||
|
|
||
| private final Map<String, DiskUsage> leastAvailableSpaceUsage; | ||
| private final Map<String, DiskUsage> mostAvailableSpaceUsage; | ||
| private final Map<String, Long> shardSizes; | ||
| private final Map<ShardId, Long> shardDataSetSizes; | ||
| private final Map<ShardRouting, String> routingToDataPath; | ||
|
|
||
| public ClusterInfoSimulator(ClusterInfo clusterInfo) { | ||
| this.leastAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeLeastAvailableDiskUsages()); | ||
| this.mostAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeMostAvailableDiskUsages()); | ||
| this.shardSizes = new HashMap<>(clusterInfo.shardSizes); | ||
| this.shardDataSetSizes = new HashMap<>(clusterInfo.shardDataSetSizes); | ||
| this.routingToDataPath = new HashMap<>(clusterInfo.routingToDataPath); | ||
| } | ||
|
|
||
| public void simulate(ShardRouting shard) { | ||
| assert shard.initializing(); | ||
|
|
||
| var size = getEstimatedShardSize(shard); | ||
| if (size != null && size > 0) { | ||
| if (shard.relocatingNodeId() != null) { | ||
| // relocation | ||
| modifyDiskUsage(shard.relocatingNodeId(), getShardPath(shard.relocatingNodeId(), mostAvailableSpaceUsage), size); | ||
| modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size); | ||
| } else { | ||
| // new shard | ||
| modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size); | ||
| shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private Long getEstimatedShardSize(ShardRouting routing) { | ||
| if (routing.relocatingNodeId() != null) { | ||
| // relocation existing shard, get size of the source shard | ||
| return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing)); | ||
| } else if (routing.primary() == false) { | ||
| // initializing new replica, get size of the source primary shard | ||
| return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing.shardId(), true)); | ||
| } else { | ||
| // initializing new (empty) primary | ||
| return 0L; | ||
| } | ||
| } | ||
|
|
||
| private String getShardPath(String nodeId, Map<String, DiskUsage> defaultSpaceUsage) { | ||
| var diskUsage = defaultSpaceUsage.get(nodeId); | ||
| return diskUsage != null ? diskUsage.getPath() : null; | ||
| } | ||
|
|
||
| private void modifyDiskUsage(String nodeId, String path, long delta) { | ||
| var leastUsage = leastAvailableSpaceUsage.get(nodeId); | ||
| if (leastUsage != null && Objects.equals(leastUsage.getPath(), path)) { | ||
| // ensure new value is within bounds | ||
| leastAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(leastUsage, delta)); | ||
| } | ||
| var mostUsage = mostAvailableSpaceUsage.get(nodeId); | ||
| if (mostUsage != null && Objects.equals(mostUsage.getPath(), path)) { | ||
| // ensure new value is within bounds | ||
| mostAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(mostUsage, delta)); | ||
| } | ||
| } | ||
|
|
||
| private static DiskUsage updateWithFreeBytes(DiskUsage usage, long delta) { | ||
| // free bytes might go out of range in case when multiple data path are used | ||
| // we might not know exact disk used to allocate a shard and conservatively update | ||
| // most used disk on a target node and least used disk on a source node | ||
| var freeBytes = withinRange(0, usage.getTotalBytes(), usage.freeBytes() + delta); | ||
| return usage.copyWithFreeBytes(freeBytes); | ||
| } | ||
|
|
||
| private static long withinRange(long min, long max, long value) { | ||
| return Math.max(min, Math.min(max, value)); | ||
| } | ||
|
|
||
| public ClusterInfo getClusterInfo() { | ||
| return new ClusterInfo( | ||
| leastAvailableSpaceUsage, | ||
| mostAvailableSpaceUsage, | ||
| shardSizes, | ||
| shardDataSetSizes, | ||
| routingToDataPath, | ||
| Map.of() | ||
| ); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,8 +56,7 @@ public DesiredBalance compute( | |
| final var routingNodes = routingAllocation.routingNodes(); | ||
| final var ignoredShards = new HashSet<>(desiredBalanceInput.ignoredShards()); | ||
| final var changes = routingAllocation.changes(); | ||
| final var knownNodeIds = routingAllocation.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); | ||
| final var unassignedPrimaries = new HashSet<ShardId>(); | ||
| final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation.clusterInfo()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we enhance |
||
|
|
||
| if (routingNodes.isEmpty()) { | ||
| return new DesiredBalance(desiredBalanceInput.index(), Map.of()); | ||
|
|
@@ -67,14 +66,15 @@ public DesiredBalance compute( | |
| for (final var routingNode : routingNodes) { | ||
| for (final var shardRouting : routingNode) { | ||
| if (shardRouting.initializing()) { | ||
| clusterInfoSimulator.simulate(shardRouting); | ||
| routingNodes.startShard(logger, shardRouting, changes); | ||
| // TODO adjust disk usage info to reflect the assumed shard movement | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // we are not responsible for allocating unassigned primaries of existing shards, and we're only responsible for allocating | ||
| // unassigned replicas if the ReplicaShardAllocator gives up, so we must respect these ignored shards | ||
| final var unassignedPrimaries = new HashSet<ShardId>(); | ||
| final var shardRoutings = new HashMap<ShardId, ShardRoutings>(); | ||
| for (final var primary : new boolean[] { true, false }) { | ||
| final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); | ||
|
|
@@ -98,6 +98,7 @@ public DesiredBalance compute( | |
| } | ||
|
|
||
| // we can assume that all possible shards will be allocated/relocated to one of their desired locations | ||
| final var knownNodeIds = routingAllocation.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); | ||
| final var unassignedShardsToInitialize = new HashMap<ShardRouting, LinkedList<String>>(); | ||
| for (final var entry : shardRoutings.entrySet()) { | ||
| final var shardId = entry.getKey(); | ||
|
|
@@ -125,11 +126,9 @@ public DesiredBalance compute( | |
| for (final var shardRouting : shardsToRelocate) { | ||
| assert shardRouting.started(); | ||
| if (targetNodesIterator.hasNext()) { | ||
| routingNodes.startShard( | ||
| logger, | ||
| routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2(), | ||
| changes | ||
| ); | ||
| ShardRouting shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2(); | ||
| clusterInfoSimulator.simulate(shardToRelocate); | ||
| routingNodes.startShard(logger, shardToRelocate, changes); | ||
| } else { | ||
| break; | ||
| } | ||
|
|
@@ -153,7 +152,9 @@ public DesiredBalance compute( | |
| final var nodeIds = unassignedShardsToInitialize.get(shardRouting); | ||
| if (nodeIds != null && nodeIds.isEmpty() == false) { | ||
| final String nodeId = nodeIds.removeFirst(); | ||
| routingNodes.startShard(logger, unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes), changes); | ||
| ShardRouting shardToInitialized = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes); | ||
| clusterInfoSimulator.simulate(shardToInitialized); | ||
| routingNodes.startShard(logger, shardToInitialized, changes); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -165,7 +166,9 @@ public DesiredBalance compute( | |
| final var nodeIds = unassignedShardsToInitialize.get(shardRouting); | ||
| if (nodeIds != null && nodeIds.isEmpty() == false) { | ||
| final String nodeId = nodeIds.removeFirst(); | ||
| routingNodes.startShard(logger, unassignedReplicaIterator.initialize(nodeId, null, 0L, changes), changes); | ||
| ShardRouting shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes); | ||
| clusterInfoSimulator.simulate(shardToInitialize); | ||
| routingNodes.startShard(logger, shardToInitialize, changes); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -208,6 +211,7 @@ public DesiredBalance compute( | |
| // TODO test that we reset ignored shards properly | ||
| } | ||
|
|
||
| routingAllocation.setSimulatedClusterInfo(clusterInfoSimulator.getClusterInfo()); | ||
| logger.trace("running delegate allocator"); | ||
| delegateAllocator.allocate(routingAllocation); | ||
| assert routingNodes.unassigned().size() == 0; // any unassigned shards should now be ignored | ||
|
|
@@ -217,9 +221,9 @@ public DesiredBalance compute( | |
| for (final var shardRouting : routingNode) { | ||
| if (shardRouting.initializing()) { | ||
| hasChanges = true; | ||
| clusterInfoSimulator.simulate(shardRouting); | ||
| routingNodes.startShard(logger, shardRouting, changes); | ||
| logger.trace("starting shard {}", shardRouting); | ||
| // TODO adjust disk usage info to reflect the assumed shard movement | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.