Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -531,8 +530,7 @@ public void testResolvePath() throws Exception {
nodeNameToNodeId.put(cursor.getValue().getName(), cursor.getKey());
}

final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
final List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
final ShardRouting shardRouting = iterators.iterator().next().nextOrNull();
assertThat(shardRouting, notNullValue());
Expand Down Expand Up @@ -571,8 +569,7 @@ public void testResolvePath() throws Exception {

private Path getPathToShardData(String indexName, String dirSuffix) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { indexName }, false);
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -311,8 +310,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
}
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
for (ShardIterator iterator : shardIterators) {
ShardRouting routing;
while ((routing = iterator.nextOrNull()) != null) {
Expand Down Expand Up @@ -667,7 +665,7 @@ public void testReplicaCorruption() throws Exception {

private int numShards(String... index) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
List<?> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
return shardIterators.size();
}

Expand Down Expand Up @@ -695,8 +693,7 @@ private ShardRouting corruptRandomPrimaryFile() throws IOException {
private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFiles) throws IOException {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Index test = state.metadata().index("test").getIndex();
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[] { "test" }, false);
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.search.stats.SearchStats;
Expand All @@ -24,6 +23,7 @@
import org.elasticsearch.test.ESIntegTestCase;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -146,7 +146,7 @@ private SearchRequestBuilder addSuggestions(SearchRequestBuilder request, int i)

private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator) {
Expand All @@ -161,7 +161,7 @@ private Set<String> nodeIdsWithIndex(String... indices) {

protected int numAssignedShards(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void testSimpleStats() throws Exception {

private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator) {
Expand Down Expand Up @@ -248,7 +247,7 @@ public void testOpenContexts() {

protected int numAssignedShards(String... indices) {
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
GroupShardsIterator<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
List<?> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -34,6 +33,7 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -101,7 +101,7 @@ protected void masterOperation(
}

Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -214,13 +213,8 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
}

@Override
protected GroupShardsIterator<ShardIterator> shards(
ClusterState clusterState,
AnalyzeIndexDiskUsageRequest request,
String[] concreteIndices
) {
final GroupShardsIterator<ShardIterator> groups = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, null, null);
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(clusterState, concreteIndices, null, null);
for (ShardIterator group : groups) {
// fails fast if any non-active groups
if (group.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -145,7 +144,7 @@ protected ShardValidateQueryResponse readShardResponse(StreamInput in) throws IO
}

@Override
protected GroupShardsIterator<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
final String routing;
if (request.allShards()) {
routing = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -93,7 +92,7 @@ final class RequestDispatcher {
this.onComplete = new RunOnce(onComplete);
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
for (String index : indices) {
final GroupShardsIterator<ShardIterator> shardIts;
final List<ShardIterator> shardIts;
try {
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null);
} catch (Exception e) {
Expand Down Expand Up @@ -250,7 +249,7 @@ private static class IndexSelector {
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
private final Map<ShardId, Exception> failures = new HashMap<>();

IndexSelector(GroupShardsIterator<ShardIterator> shardIts) {
IndexSelector(List<ShardIterator> shardIts) {
for (ShardIterator shardIt : shardIts) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.Maps;
Expand Down Expand Up @@ -61,9 +60,9 @@
import static org.elasticsearch.core.Strings.format;

/**
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link List<SearchShardIterator>}
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link List<SearchShardIterator>} which is later
* referred to as the {@code shardIndex}.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
* distributed frequencies
Expand Down Expand Up @@ -94,8 +93,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
protected final List<SearchShardIterator> toSkipShardsIts;
protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
Expand All @@ -117,7 +116,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Executor executor,
SearchRequest request,
ActionListener<SearchResponse> listener,
GroupShardsIterator<SearchShardIterator> shardsIts,
List<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider,
ClusterState clusterState,
SearchTask task,
Expand All @@ -136,8 +135,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = toSkipIterators;
this.shardsIts = iterators;
outstandingShards = new AtomicInteger(shardsIts.size());
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
// we later compute the shard index based on the natural order of the shards
Expand Down Expand Up @@ -172,8 +171,8 @@ protected void notifyListShards(
SearchSourceBuilder sourceBuilder
) {
progressListener.notifyListShards(
SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts),
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
clusters,
sourceBuilder == null || sourceBuilder.size() > 0,
timeProvider
Expand Down Expand Up @@ -257,7 +256,7 @@ void skipShard(SearchShardIterator iterator) {
successfulShardExecution();
}

private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
for (SearchShardIterator it : shardsIts) {
if (it.getTargetNodeIds().isEmpty() == false) {
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
Expand Down
Loading