Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@

package org.elasticsearch.action.fieldcaps;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.ObjectIntMap;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.carrotsearch.hppc.cursors.ObjectIntCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -261,9 +256,9 @@ public <T extends TransportResponse> void sendRequest(
assertThat(dispatcher.executionRound(), equalTo(maxRound + 1));
for (String index : indices) {
if (withFilter) {
ObjectIntMap<ShardId> copies = new ObjectIntHashMap<>();
Map<ShardId, Integer> copies = new HashMap<>();
for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) {
copies.addTo(shardRouting.shardId(), 1);
copies.merge(shardRouting.shardId(), 1, Integer::sum);
}
final int executedRounds = maxFailedRounds.getOrDefault(index, 0);
for (int round = 0; round <= executedRounds; round++) {
Expand All @@ -274,10 +269,10 @@ public <T extends TransportResponse> void sendRequest(
}
}
final Set<ShardId> availableShards = new HashSet<>();
for (ObjectIntCursor<ShardId> e : copies) {
if (e.value > 0) {
availableShards.add(e.key);
copies.addTo(e.key, -1);
for (var e : copies.entrySet()) {
if (e.getValue() > 0) {
availableShards.add(e.getKey());
copies.merge(e.getKey(), -1, Integer::sum);
}
}
assertThat("round: " + round, requestedShards, equalTo(availableShards));
Expand Down Expand Up @@ -381,9 +376,9 @@ public <T extends TransportResponse> void sendRequest(
assertThat(dispatcher.executionRound(), equalTo(maxRound));
for (String index : indices) {
if (withFilter) {
ObjectIntMap<ShardId> copies = new ObjectIntHashMap<>();
Map<ShardId, Integer> copies = new HashMap<>();
for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) {
copies.addTo(shardRouting.shardId(), 1);
copies.merge(shardRouting.shardId(), 1, Integer::sum);
}
final int executedRounds = failedIndices.contains(index) ? maxPossibleRounds(clusterState, index, true) : 0;
for (int round = 0; round <= executedRounds; round++) {
Expand All @@ -394,17 +389,17 @@ public <T extends TransportResponse> void sendRequest(
}
}
final Set<ShardId> availableShards = new HashSet<>();
for (ObjectIntCursor<ShardId> e : copies) {
if (e.value > 0) {
availableShards.add(e.key);
copies.addTo(e.key, -1);
for (var e : copies.entrySet()) {
if (e.getValue() > 0) {
availableShards.add(e.getKey());
copies.merge(e.getKey(), -1, Integer::sum);
}
}
assertThat("round: " + round, requestedShards, equalTo(availableShards));
}
if (failedIndices.contains(index)) {
for (ObjectIntCursor<ShardId> cursor : copies) {
assertThat("All copies of shard " + cursor.key + " must be tried", cursor.value, equalTo(0));
for (var e : copies.entrySet()) {
assertThat("All copies of shard " + e.getKey() + " must be tried", e.getValue(), equalTo(0));
}
}
} else {
Expand Down Expand Up @@ -657,10 +652,10 @@ void verifyAfterComplete() {
List<NodeRequest> nodeRequests = sentNodeRequests.stream().filter(r -> r.round == round).toList();
if (withFilter == false) {
// Without filter, each index is requested once in each round.
ObjectIntMap<String> requestsPerIndex = new ObjectIntHashMap<>();
nodeRequests.forEach(r -> r.indices().forEach(index -> requestsPerIndex.addTo(index, 1)));
for (ObjectIntCursor<String> e : requestsPerIndex) {
assertThat("index " + e.key + " has requested more than once", e.value, equalTo(1));
Map<String, Integer> requestsPerIndex = new HashMap<>();
nodeRequests.forEach(r -> r.indices().forEach(index -> requestsPerIndex.merge(index, 1, Integer::sum)));
for (var e : requestsPerIndex.entrySet()) {
assertThat("index " + e.getKey() + " has requested more than once", e.getValue(), equalTo(1));
}
}
// With or without filter, each new node receives at most one request each round
Expand Down Expand Up @@ -930,23 +925,23 @@ private ClusterState newClusterState(Metadata metadata, DiscoveryNodes discovery
static int maxPossibleRounds(ClusterState clusterState, String index, boolean withFilter) {
final IndexRoutingTable routingTable = clusterState.routingTable().index(index);
if (withFilter) {
ObjectIntMap<ShardId> numCopiesPerShard = new ObjectIntHashMap<>();
Map<ShardId, Integer> numCopiesPerShard = new HashMap<>();
for (ShardRouting shard : routingTable.randomAllActiveShardsIt()) {
numCopiesPerShard.addTo(shard.shardId(), 1);
numCopiesPerShard.merge(shard.shardId(), 1, Integer::sum);
}
int maxRound = 0;
for (ObjectIntCursor<ShardId> numCopies : numCopiesPerShard) {
maxRound = Math.max(maxRound, numCopies.value);
for (var numCopies : numCopiesPerShard.values()) {
maxRound = Math.max(maxRound, numCopies);
}
return maxRound;
} else {
ObjectIntMap<String> requestsPerNode = new ObjectIntHashMap<>();
Map<String, Integer> requestsPerNode = new HashMap<>();
for (ShardRouting shard : routingTable.randomAllActiveShardsIt()) {
requestsPerNode.put(shard.currentNodeId(), 1);
}
int totalRequests = 0;
for (IntCursor cursor : requestsPerNode.values()) {
totalRequests += cursor.value;
for (var nodeRequests : requestsPerNode.values()) {
totalRequests += nodeRequests;
}
return totalRequests;
}
Expand Down