Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5c634a7
wip cluster info simulation
idegtiarenko Sep 14, 2022
342ba59
use cluster info simulator
idegtiarenko Sep 14, 2022
d0fa4c6
null check
idegtiarenko Sep 15, 2022
105fcef
Fix MockDiskUsagesIT
idegtiarenko Sep 15, 2022
7293edd
do not keep reserved info
idegtiarenko Sep 15, 2022
779016b
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 19, 2022
5c919f9
multiple data path test case
idegtiarenko Sep 20, 2022
103610a
add range check
idegtiarenko Sep 20, 2022
5aaad09
testComputeConsideringShardSizes
idegtiarenko Sep 21, 2022
cb7a174
keep node below watermark
idegtiarenko Sep 21, 2022
5faf34e
move class
idegtiarenko Sep 21, 2022
a8072ca
fix move
idegtiarenko Sep 21, 2022
2985353
rework test
idegtiarenko Sep 21, 2022
07be67f
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 21, 2022
480ad7a
Update server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
idegtiarenko Sep 21, 2022
ee9a19f
Update server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
idegtiarenko Sep 21, 2022
8c92948
make sure test is failing without simulated disk space
idegtiarenko Sep 21, 2022
2f1e6fb
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 26, 2022
67ee7dd
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 29, 2022
52fa7d3
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 29, 2022
93a72dc
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Oct 3, 2022
b64afdd
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,9 @@ public class ShardStats implements Writeable, ToXContentFragment {
private final CommitStats commitStats;
@Nullable
private final SeqNoStats seqNoStats;

@Nullable
private final RetentionLeaseStats retentionLeaseStats;

/**
* Gets the current retention lease stats.
*
* @return the current retention lease stats
*/
public RetentionLeaseStats getRetentionLeaseStats() {
return retentionLeaseStats;
}

private final String dataPath;
private final String statePath;
private final boolean isCustomDataPath;
Expand All @@ -67,21 +57,43 @@ public ShardStats(StreamInput in) throws IOException {
}

public ShardStats(
final ShardRouting routing,
final ShardRouting shardRouting,
final ShardPath shardPath,
final CommonStats commonStats,
final CommitStats commitStats,
final SeqNoStats seqNoStats,
final RetentionLeaseStats retentionLeaseStats
) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = shardPath.isCustomDataPath();
this.commitStats = commitStats;
this(
shardRouting,
commonStats,
commitStats,
seqNoStats,
retentionLeaseStats,
shardPath.getRootDataPath().toString(),
shardPath.getRootStatePath().toString(),
shardPath.isCustomDataPath()
);
}

public ShardStats(
ShardRouting shardRouting,
CommonStats commonStats,
CommitStats commitStats,
SeqNoStats seqNoStats,
RetentionLeaseStats retentionLeaseStats,
String dataPath,
String statePath,
boolean isCustomDataPath
) {
this.shardRouting = shardRouting;
this.commonStats = commonStats;
this.commitStats = commitStats;
this.seqNoStats = seqNoStats;
this.retentionLeaseStats = retentionLeaseStats;
this.dataPath = dataPath;
this.statePath = statePath;
this.isCustomDataPath = isCustomDataPath;
}

@Override
Expand Down Expand Up @@ -125,6 +137,15 @@ public SeqNoStats getSeqNoStats() {
return this.seqNoStats;
}

/**
* Gets the current retention lease stats.
*
* @return the current retention lease stats
*/
public RetentionLeaseStats getRetentionLeaseStats() {
return retentionLeaseStats;
}

public String getDataPath() {
return dataPath;
}
Expand Down
47 changes: 41 additions & 6 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -35,15 +36,15 @@
*/
public class ClusterInfo implements ToXContentFragment, Writeable {

public static final ClusterInfo EMPTY = new ClusterInfo();
public static final Version DATA_SET_SIZE_SIZE_VERSION = Version.V_7_13_0;

private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
final Map<String, Long> shardSizes;
final Map<ShardId, Long> shardDataSetSizes;
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
public final Map<String, Long> shardSizes;
public final Map<ShardId, Long> shardDataSetSizes;
public final Map<ShardRouting, String> routingToDataPath;
public final Map<NodeAndPath, ReservedSpace> reservedSpace;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
Expand Down Expand Up @@ -224,7 +225,41 @@ public ReservedSpace getReservedSpace(String nodeId, String dataPath) {
* includes a 'p' or 'r' depending on whether the shard is a primary.
*/
public static String shardIdentifierFromRouting(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
return shardIdentifierFromRouting(shardRouting.shardId(), shardRouting.primary());
}

public static String shardIdentifierFromRouting(ShardId shardId, boolean primary) {
return shardId.toString() + "[" + (primary ? "p" : "r") + "]";
}

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use EqualsHashCodeTestUtils.checkEqualsAndHashCode to test this in ClusterInfoTests. Alternatively add a dedicated AbstractWireSerializingTestCase derivative for ClusterInfo which would cover equals and hashcode as well as replacing ClusterInfoTests#testSerialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the change to equals/hashCode and their tests be ported to main separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that'd be good.

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterInfo that = (ClusterInfo) o;
return leastAvailableSpaceUsage.equals(that.leastAvailableSpaceUsage)
&& mostAvailableSpaceUsage.equals(that.mostAvailableSpaceUsage)
&& shardSizes.equals(that.shardSizes)
&& shardDataSetSizes.equals(that.shardDataSetSizes)
&& routingToDataPath.equals(that.routingToDataPath)
&& reservedSpace.equals(that.reservedSpace);
}

@Override
public int hashCode() {
return Objects.hash(
leastAvailableSpaceUsage,
mostAvailableSpaceUsage,
shardSizes,
shardDataSetSizes,
routingToDataPath,
reservedSpace
);
}

@Override
public String toString() {
return Strings.toString(this, true, false);
}

/**
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/elasticsearch/cluster/DiskUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class DiskUsage implements ToXContentFragment, Writeable {
public DiskUsage(String nodeId, String nodeName, String path, long totalBytes, long freeBytes) {
this.nodeId = nodeId;
this.nodeName = nodeName;
this.freeBytes = freeBytes;
this.totalBytes = totalBytes;
this.path = path;
this.totalBytes = totalBytes;
this.freeBytes = freeBytes;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reordered to match constructor args order. Will port it to the main branch separately later.

}

public DiskUsage(StreamInput in) throws IOException {
Expand Down Expand Up @@ -157,6 +157,10 @@ public String toString() {
+ "]";
}

public DiskUsage copyWithFreeBytes(long freeBytes) {
return new DiskUsage(nodeId, nodeName, path, totalBytes, freeBytes);
}

/**
* Finds the path with the least available disk space and returns its disk usage. It returns null if there is no
* file system data in the NodeStats or if the total bytes are a negative number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
new HashMap<>();
buildShardLevelInfo(
clusterService.state().routingTable(),
stats,
adjustShardStats(stats),
shardSizeByIdentifierBuilder,
shardDataSetSizeBuilder,
dataPathByShardRoutingBuilder,
Expand Down Expand Up @@ -430,6 +430,10 @@ List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
return nodeStats;
}

ShardStats[] adjustShardStats(ShardStats[] shardStats) {
return shardStats;
}

void refreshAsync(ActionListener<ClusterInfo> future) {
final Runnable newRefresh;
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class RoutingAllocation {

private final ClusterState clusterState;

private final ClusterInfo clusterInfo;
private ClusterInfo clusterInfo;

private final SnapshotShardSizeInfo shardSizeInfo;

Expand Down Expand Up @@ -367,6 +367,11 @@ public boolean isSimulating() {
return isSimulating;
}

public void setSimulatedClusterInfo(ClusterInfo clusterInfo) {
assert isSimulating : "Should be called only while simulating";
this.clusterInfo = clusterInfo;
}

public RoutingAllocation immutableClone() {
return new RoutingAllocation(deciders, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ClusterInfoSimulator {

private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
private final Map<String, Long> shardSizes;
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<ShardRouting, String> routingToDataPath;
private final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace;

public ClusterInfoSimulator(ClusterInfo clusterInfo) {
this.leastAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeLeastAvailableDiskUsages());
this.mostAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeMostAvailableDiskUsages());
this.shardSizes = new HashMap<>(clusterInfo.shardSizes);
this.shardDataSetSizes = new HashMap<>(clusterInfo.shardDataSetSizes);
this.routingToDataPath = new HashMap<>(clusterInfo.routingToDataPath);
this.reservedSpace = new HashMap<>(clusterInfo.reservedSpace);
}

public void simulate(ShardRouting shard) {
assert shard.initializing();

var size = getEstimatedShardSize(shard);
if (size != null && size > 0) {
if (shard.relocatingNodeId() != null) {
// relocation
increaseTargetDiskUsage(shard.relocatingNodeId(), size);
decreaseSourceDiskUsage(shard.currentNodeId(), size);
// TODO update shard data path?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routingToDataPath is keyed by a ShardRouting. If we want to update it as well then we need to also pass the state of the shard after it was initialized here.

} else {
// new shard
increaseTargetDiskUsage(shard.currentNodeId(), size);
this.shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size);
}
}
}

private Long getEstimatedShardSize(ShardRouting routing) {
if (routing.relocatingNodeId() != null) {
// get size of the source shard
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing));
} else if (routing.primary() == false) {
// get size of the source primary shard
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing.shardId(), true));
} else {
// initializing new (empty) primary
return 0L;
}
}

private void increaseTargetDiskUsage(String nodeId, long size) {
var leastUsage = leastAvailableSpaceUsage.get(nodeId);
var mostUsage = mostAvailableSpaceUsage.get(nodeId);
if (leastUsage == null || mostUsage == null) {
return;
}

// if single data path is used then it is present (and needs to be updated in both maps)
// otherwise conservatively decrease only lest available space
if (Objects.equals(leastUsage.getPath(), mostUsage.getPath())) {
// single data dir is used
mostAvailableSpaceUsage.put(nodeId, mostUsage.copyWithFreeBytes(mostUsage.getFreeBytes() - size));
}
leastAvailableSpaceUsage.put(nodeId, leastUsage.copyWithFreeBytes(leastUsage.getFreeBytes() - size));
}

private void decreaseSourceDiskUsage(String nodeId, long size) {
var leastUsage = leastAvailableSpaceUsage.get(nodeId);
var mostUsage = mostAvailableSpaceUsage.get(nodeId);
if (leastUsage == null || mostUsage == null) {
return;
}

// if single data path is used then it is present (and needs to be updated in both maps)
// otherwise conservatively increase only most available space
if (Objects.equals(leastUsage.getPath(), mostUsage.getPath())) {
// single data dir is used
leastAvailableSpaceUsage.put(nodeId, leastUsage.copyWithFreeBytes(leastUsage.getFreeBytes() + size));
}
mostAvailableSpaceUsage.put(nodeId, mostUsage.copyWithFreeBytes(mostUsage.getFreeBytes() + size));
}

public ClusterInfo getClusterInfo() {
return new ClusterInfo(
leastAvailableSpaceUsage,
mostAvailableSpaceUsage,
shardSizes,
shardDataSetSizes,
routingToDataPath,
reservedSpace
);
}
}
Loading