Skip to content

Commit 50cdbe5

Browse files
authored
simulate disk usage during balance calculation (#90061)
1 parent 990a1a2 commit 50cdbe5

File tree

6 files changed

+425
-28
lines changed

6 files changed

+425
-28
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster;
10+
11+
import org.elasticsearch.cluster.routing.ShardRouting;
12+
import org.elasticsearch.index.shard.ShardId;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.Objects;
17+
18+
public class ClusterInfoSimulator {
19+
20+
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
21+
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
22+
private final Map<String, Long> shardSizes;
23+
private final Map<ShardId, Long> shardDataSetSizes;
24+
private final Map<ClusterInfo.NodeAndShard, String> dataPath;
25+
26+
public ClusterInfoSimulator(ClusterInfo clusterInfo) {
27+
this.leastAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeLeastAvailableDiskUsages());
28+
this.mostAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeMostAvailableDiskUsages());
29+
this.shardSizes = new HashMap<>(clusterInfo.shardSizes);
30+
this.shardDataSetSizes = new HashMap<>(clusterInfo.shardDataSetSizes);
31+
this.dataPath = new HashMap<>(clusterInfo.dataPath);
32+
}
33+
34+
public void simulate(ShardRouting shard) {
35+
assert shard.initializing();
36+
37+
var size = getEstimatedShardSize(shard);
38+
if (size != null && size > 0) {
39+
if (shard.relocatingNodeId() != null) {
40+
// relocation
41+
modifyDiskUsage(shard.relocatingNodeId(), getShardPath(shard.relocatingNodeId(), mostAvailableSpaceUsage), size);
42+
modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size);
43+
} else {
44+
// new shard
45+
modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size);
46+
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size);
47+
}
48+
}
49+
}
50+
51+
private Long getEstimatedShardSize(ShardRouting routing) {
52+
if (routing.relocatingNodeId() != null) {
53+
// relocation existing shard, get size of the source shard
54+
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing));
55+
} else if (routing.primary() == false) {
56+
// initializing new replica, get size of the source primary shard
57+
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing.shardId(), true));
58+
} else {
59+
// initializing new (empty) primary
60+
return 0L;
61+
}
62+
}
63+
64+
private String getShardPath(String nodeId, Map<String, DiskUsage> defaultSpaceUsage) {
65+
var diskUsage = defaultSpaceUsage.get(nodeId);
66+
return diskUsage != null ? diskUsage.getPath() : null;
67+
}
68+
69+
private void modifyDiskUsage(String nodeId, String path, long delta) {
70+
var leastUsage = leastAvailableSpaceUsage.get(nodeId);
71+
if (leastUsage != null && Objects.equals(leastUsage.getPath(), path)) {
72+
// ensure new value is within bounds
73+
leastAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(leastUsage, delta));
74+
}
75+
var mostUsage = mostAvailableSpaceUsage.get(nodeId);
76+
if (mostUsage != null && Objects.equals(mostUsage.getPath(), path)) {
77+
// ensure new value is within bounds
78+
mostAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(mostUsage, delta));
79+
}
80+
}
81+
82+
private static DiskUsage updateWithFreeBytes(DiskUsage usage, long delta) {
83+
// free bytes might go out of range in case when multiple data path are used
84+
// we might not know exact disk used to allocate a shard and conservatively update
85+
// most used disk on a target node and least used disk on a source node
86+
var freeBytes = withinRange(0, usage.getTotalBytes(), usage.freeBytes() + delta);
87+
return usage.copyWithFreeBytes(freeBytes);
88+
}
89+
90+
private static long withinRange(long min, long max, long value) {
91+
return Math.max(min, Math.min(max, value));
92+
}
93+
94+
public ClusterInfo getClusterInfo() {
95+
return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, Map.of());
96+
}
97+
}

server/src/main/java/org/elasticsearch/cluster/DiskUsage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ public String toString() {
119119
+ "]";
120120
}
121121

122+
public DiskUsage copyWithFreeBytes(long freeBytes) {
123+
return new DiskUsage(nodeId, nodeName, path, totalBytes, freeBytes);
124+
}
125+
122126
/**
123127
* Finds the path with the least available disk space and returns its disk usage. It returns null if there is no
124128
* file system data in the NodeStats or if the total bytes are a negative number.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class RoutingAllocation {
5050

5151
private final ClusterState clusterState;
5252

53-
private final ClusterInfo clusterInfo;
53+
private ClusterInfo clusterInfo;
5454

5555
private final SnapshotShardSizeInfo shardSizeInfo;
5656

@@ -407,6 +407,11 @@ public boolean isSimulating() {
407407
return isSimulating;
408408
}
409409

410+
public void setSimulatedClusterInfo(ClusterInfo clusterInfo) {
411+
assert isSimulating : "Should be called only while simulating";
412+
this.clusterInfo = clusterInfo;
413+
}
414+
410415
public RoutingAllocation immutableClone() {
411416
return new RoutingAllocation(deciders, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
412417
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.cluster.ClusterInfoSimulator;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.cluster.routing.RoutingNodes;
1516
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -58,7 +59,7 @@ public DesiredBalance compute(
5859
final var changes = routingAllocation.changes();
5960
final var ignoredShards = desiredBalanceInput.ignoredShards();
6061
final var knownNodeIds = routingAllocation.nodes().stream().map(DiscoveryNode::getId).collect(toSet());
61-
final var unassignedPrimaries = new HashSet<ShardId>();
62+
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation.clusterInfo());
6263

6364
if (routingNodes.isEmpty()) {
6465
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
@@ -68,14 +69,15 @@ public DesiredBalance compute(
6869
for (final var routingNode : routingNodes) {
6970
for (final var shardRouting : routingNode) {
7071
if (shardRouting.initializing()) {
72+
clusterInfoSimulator.simulate(shardRouting);
7173
routingNodes.startShard(logger, shardRouting, changes, 0L);
72-
// TODO adjust disk usage info to reflect the assumed shard movement
7374
}
7475
}
7576
}
7677

7778
// we are not responsible for allocating unassigned primaries of existing shards, and we're only responsible for allocating
7879
// unassigned replicas if the ReplicaShardAllocator gives up, so we must respect these ignored shards
80+
final var unassignedPrimaries = new HashSet<ShardId>();
7981
final var shardRoutings = new HashMap<ShardId, ShardRoutings>();
8082
for (final var primary : new boolean[] { true, false }) {
8183
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
@@ -134,12 +136,9 @@ public DesiredBalance compute(
134136
for (final var shardRouting : shardsToRelocate) {
135137
assert shardRouting.started();
136138
if (targetNodesIterator.hasNext()) {
137-
routingNodes.startShard(
138-
logger,
139-
routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2(),
140-
changes,
141-
0L
142-
);
139+
ShardRouting shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2();
140+
clusterInfoSimulator.simulate(shardToRelocate);
141+
routingNodes.startShard(logger, shardToRelocate, changes, 0L);
143142
} else {
144143
break;
145144
}
@@ -163,7 +162,9 @@ public DesiredBalance compute(
163162
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
164163
if (nodeIds != null && nodeIds.isEmpty() == false) {
165164
final String nodeId = nodeIds.removeFirst();
166-
routingNodes.startShard(logger, unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes), changes, 0L);
165+
ShardRouting shardToInitialized = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
166+
clusterInfoSimulator.simulate(shardToInitialized);
167+
routingNodes.startShard(logger, shardToInitialized, changes, 0L);
167168
}
168169
}
169170
}
@@ -175,7 +176,9 @@ public DesiredBalance compute(
175176
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
176177
if (nodeIds != null && nodeIds.isEmpty() == false) {
177178
final String nodeId = nodeIds.removeFirst();
178-
routingNodes.startShard(logger, unassignedReplicaIterator.initialize(nodeId, null, 0L, changes), changes, 0L);
179+
ShardRouting shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
180+
clusterInfoSimulator.simulate(shardToInitialize);
181+
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
179182
}
180183
}
181184
}
@@ -215,6 +218,7 @@ public DesiredBalance compute(
215218
// TODO test that we reset ignored shards properly
216219
}
217220

221+
routingAllocation.setSimulatedClusterInfo(clusterInfoSimulator.getClusterInfo());
218222
logger.trace("running delegate allocator");
219223
delegateAllocator.allocate(routingAllocation);
220224
assert routingNodes.unassigned().size() == 0; // any unassigned shards should now be ignored
@@ -224,9 +228,9 @@ public DesiredBalance compute(
224228
for (final var shardRouting : routingNode) {
225229
if (shardRouting.initializing()) {
226230
hasChanges = true;
231+
clusterInfoSimulator.simulate(shardRouting);
227232
routingNodes.startShard(logger, shardRouting, changes, 0L);
228233
logger.trace("starting shard {}", shardRouting);
229-
// TODO adjust disk usage info to reflect the assumed shard movement
230234
}
231235
}
232236
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.allocator;
10+
11+
import org.elasticsearch.cluster.ClusterInfo;
12+
import org.elasticsearch.cluster.ClusterInfoSimulator;
13+
import org.elasticsearch.cluster.DiskUsage;
14+
import org.elasticsearch.cluster.routing.ShardRouting;
15+
import org.elasticsearch.test.ESTestCase;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
21+
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
22+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
23+
import static org.hamcrest.Matchers.equalTo;
24+
25+
public class ClusterInfoSimulatorTests extends ESTestCase {
26+
27+
public void testInitializeNewPrimary() {
28+
29+
var newPrimary = newShardRouting("index-1", 0, "node-0", true, INITIALIZING);
30+
31+
var simulator = new ClusterInfoSimulator(
32+
new ClusterInfoTestBuilder() //
33+
.withNode("node-0", new DiskUsageBuilder(1000, 1000))
34+
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
35+
.withShard(newPrimary, 0)
36+
.build()
37+
);
38+
simulator.simulate(newPrimary);
39+
40+
assertThat(
41+
simulator.getClusterInfo(),
42+
equalTo(
43+
new ClusterInfoTestBuilder() //
44+
.withNode("node-0", new DiskUsageBuilder(1000, 1000))
45+
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
46+
.withShard(newPrimary, 0)
47+
.build()
48+
)
49+
);
50+
}
51+
52+
public void testInitializeNewReplica() {
53+
54+
var existingPrimary = newShardRouting("index-1", 0, "node-0", true, STARTED);
55+
var newReplica = newShardRouting("index-1", 0, "node-1", false, INITIALIZING);
56+
57+
var simulator = new ClusterInfoSimulator(
58+
new ClusterInfoTestBuilder() //
59+
.withNode("node-0", new DiskUsageBuilder(1000, 900))
60+
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
61+
.withShard(existingPrimary, 100)
62+
.withShard(newReplica, 0)
63+
.build()
64+
);
65+
simulator.simulate(newReplica);
66+
67+
assertThat(
68+
simulator.getClusterInfo(),
69+
equalTo(
70+
new ClusterInfoTestBuilder() //
71+
.withNode("node-0", new DiskUsageBuilder(1000, 900))
72+
.withNode("node-1", new DiskUsageBuilder(1000, 900))
73+
.withShard(existingPrimary, 100)
74+
.withShard(newReplica, 100)
75+
.build()
76+
)
77+
);
78+
}
79+
80+
public void testRelocateShard() {
81+
82+
var fromNodeId = "node-0";
83+
var toNodeId = "node-1";
84+
85+
var shard = newShardRouting("index-1", 0, toNodeId, fromNodeId, true, INITIALIZING);
86+
87+
var simulator = new ClusterInfoSimulator(
88+
new ClusterInfoTestBuilder() //
89+
.withNode(fromNodeId, new DiskUsageBuilder(1000, 900))
90+
.withNode(toNodeId, new DiskUsageBuilder(1000, 1000))
91+
.withShard(shard, 100)
92+
.build()
93+
);
94+
simulator.simulate(shard);
95+
96+
assertThat(
97+
simulator.getClusterInfo(),
98+
equalTo(
99+
new ClusterInfoTestBuilder() //
100+
.withNode(fromNodeId, new DiskUsageBuilder(1000, 1000))
101+
.withNode(toNodeId, new DiskUsageBuilder(1000, 900))
102+
.withShard(shard, 100)
103+
.build()
104+
)
105+
);
106+
}
107+
108+
public void testRelocateShardWithMultipleDataPath1() {
109+
110+
var fromNodeId = "node-0";
111+
var toNodeId = "node-1";
112+
113+
var shard = newShardRouting("index-1", 0, toNodeId, fromNodeId, true, INITIALIZING);
114+
115+
var simulator = new ClusterInfoSimulator(
116+
new ClusterInfoTestBuilder() //
117+
.withNode(fromNodeId, new DiskUsageBuilder("/data-1", 1000, 500), new DiskUsageBuilder("/data-2", 1000, 750))
118+
.withNode(toNodeId, new DiskUsageBuilder("/data-1", 1000, 750), new DiskUsageBuilder("/data-2", 1000, 900))
119+
.withShard(shard, 100)
120+
.build()
121+
);
122+
simulator.simulate(shard);
123+
124+
assertThat(
125+
simulator.getClusterInfo(),
126+
equalTo(
127+
new ClusterInfoTestBuilder() //
128+
.withNode(fromNodeId, new DiskUsageBuilder("/data-1", 1000, 500), new DiskUsageBuilder("/data-2", 1000, 850))
129+
.withNode(toNodeId, new DiskUsageBuilder("/data-1", 1000, 650), new DiskUsageBuilder("/data-2", 1000, 900))
130+
.withShard(shard, 100)
131+
.build()
132+
)
133+
);
134+
}
135+
136+
private static class ClusterInfoTestBuilder {
137+
138+
private final Map<String, DiskUsage> leastAvailableSpaceUsage = new HashMap<>();
139+
private final Map<String, DiskUsage> mostAvailableSpaceUsage = new HashMap<>();
140+
private final Map<String, Long> shardSizes = new HashMap<>();
141+
142+
public ClusterInfoTestBuilder withNode(String name, DiskUsageBuilder diskUsageBuilderBuilder) {
143+
leastAvailableSpaceUsage.put(name, diskUsageBuilderBuilder.toDiskUsage(name));
144+
mostAvailableSpaceUsage.put(name, diskUsageBuilderBuilder.toDiskUsage(name));
145+
return this;
146+
}
147+
148+
public ClusterInfoTestBuilder withNode(String name, DiskUsageBuilder leastAvailableSpace, DiskUsageBuilder mostAvailableSpace) {
149+
leastAvailableSpaceUsage.put(name, leastAvailableSpace.toDiskUsage(name));
150+
mostAvailableSpaceUsage.put(name, mostAvailableSpace.toDiskUsage(name));
151+
return this;
152+
}
153+
154+
public ClusterInfoTestBuilder withShard(ShardRouting shard, long size) {
155+
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size);
156+
return this;
157+
}
158+
159+
public ClusterInfo build() {
160+
return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), Map.of());
161+
}
162+
}
163+
164+
private record DiskUsageBuilder(String path, long total, long free) {
165+
166+
private DiskUsageBuilder(long total, long free) {
167+
this("/data", total, free);
168+
}
169+
170+
public DiskUsage toDiskUsage(String name) {
171+
return new DiskUsage(name, name, name + '/' + path, total, free);
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)