Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -257,20 +259,47 @@ public void testFinalReduce() {
public void testWaitForRefreshIndexValidation() throws Exception {
int numberOfShards = randomIntBetween(3, 10);
assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
assertAcked(prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
assertAcked(prepareCreate("test3").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
client().admin().indices().prepareAliases().addAlias("test1", "testAlias").get();
client().admin().indices().prepareAliases().addAlias(new String[] {"test2", "test3"}, "testFailedAlias").get();

long[] validCheckpoints = new long[numberOfShards];
Arrays.fill(validCheckpoints, SequenceNumbers.UNASSIGNED_SEQ_NO);

// no exception
client().prepareSearch("testAlias").get();
client().prepareSearch("testAlias").setWaitForCheckpoints(Collections.singletonMap("testAlias", validCheckpoints)).get();


IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> client().prepareSearch("testAlias").setWaitForCheckpoints(Collections.singletonMap("testAlias", new long[0])).get());
assertThat(e.getMessage(), containsString("Index configured with wait_for_checkpoints must be a concrete index resolved in this " +
"search. Index [testAlias] is not a concrete index resolved in this search."));
() -> client().prepareSearch("testFailedAlias")
.setWaitForCheckpoints(Collections.singletonMap("testFailedAlias", validCheckpoints))
.get());
assertThat(e.getMessage(), containsString("Failed to resolve wait_for_checkpoints target [testFailedAlias]. Configured target " +
"must resolve to a single open index."));

IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class,
() -> client().prepareSearch("test1").setWaitForCheckpoints(Collections.singletonMap("test1", new long[2])).get());
() -> client().prepareSearch("test1")
.setWaitForCheckpoints(Collections.singletonMap("test1", new long[2]))
.get());
assertThat(e2.getMessage(), containsString("Index configured with wait_for_checkpoints must search the same number of shards as " +
"checkpoints provided. [2] checkpoints provided. Index [test1] has [" + numberOfShards + "] shards."));
"checkpoints provided. [2] checkpoints provided. Target [test1] which resolved to index [test1] has [" + numberOfShards +
"] shards."));

IllegalArgumentException e3 = expectThrows(IllegalArgumentException.class,
() -> client().prepareSearch("testAlias")
.setWaitForCheckpoints(Collections.singletonMap("testAlias", new long[2]))
.get());
assertThat(e3.getMessage(), containsString("Index configured with wait_for_checkpoints must search the same number of shards as " +
"checkpoints provided. [2] checkpoints provided. Target [testAlias] which resolved to index [test1] has [" + numberOfShards +
"] shards."));

IllegalArgumentException e4 = expectThrows(IllegalArgumentException.class,
() -> client().prepareSearch("testAlias")
.setWaitForCheckpoints(Collections.singletonMap("test2", validCheckpoints))
.get());
assertThat(e4.getMessage(), containsString("Target configured with wait_for_checkpoints must be a concrete index resolved in " +
"this search. Target [test2] is not a concrete index resolved in this search."));
}

public void testShardCountLimit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
Expand Down Expand Up @@ -711,7 +712,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
if (remoteShardIterators.isEmpty() == false) {
throw new IllegalArgumentException("Cannot use wait_for_checkpoints parameter with cross-cluster searches.");
} else {
validateWaitForCheckpoint(clusterState, searchRequest, concreteLocalIndices);
validateAndResolveWaitForCheckpoint(clusterState, indexNameExpressionResolver, searchRequest, concreteLocalIndices);
}
}

Expand Down Expand Up @@ -889,24 +890,46 @@ public void run() {
}
}

private static void validateWaitForCheckpoint(ClusterState clusterState, SearchRequest searchRequest, String[] concreteLocalIndices) {
private static void validateAndResolveWaitForCheckpoint(ClusterState clusterState, IndexNameExpressionResolver resolver,
SearchRequest searchRequest, String[] concreteLocalIndices) {
HashSet<String> searchedIndices = new HashSet<>(Arrays.asList(concreteLocalIndices));
HashMap<String, long[]> newWaitForCheckpoints = new HashMap<>(searchRequest.getWaitForCheckpoints().size());
for (Map.Entry<String, long[]> waitForCheckpointIndex : searchRequest.getWaitForCheckpoints().entrySet()) {
int checkpointsProvided = waitForCheckpointIndex.getValue().length;
String index = waitForCheckpointIndex.getKey();
long[] checkpoints = waitForCheckpointIndex.getValue();
int checkpointsProvided = checkpoints.length;
String target = waitForCheckpointIndex.getKey();
Index resolved;
try {
resolved = resolver.concreteSingleIndex(clusterState, new IndicesRequest() {
@Override
public String[] indices() {
return new String[] { target };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
});
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve wait_for_checkpoints target [" + target + "]. Configured target " +
"must resolve to a single open index.", e);
}
String index = resolved.getName();
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (searchedIndices.contains(index) == false) {
throw new IllegalArgumentException("Index configured with wait_for_checkpoints must be a concrete index resolved in " +
"this search. Index [" + index + "] is not a concrete index resolved in this search.");
throw new IllegalArgumentException("Target configured with wait_for_checkpoints must be a concrete index resolved in " +
"this search. Target [" + target + "] is not a concrete index resolved in this search.");
} else if (indexMetadata == null) {
throw new IllegalArgumentException("Cannot find index configured for wait_for_checkpoints parameter [" + index + "].");
} else if (indexMetadata.getNumberOfShards() != checkpointsProvided) {
throw new IllegalArgumentException("Index configured with wait_for_checkpoints must search the same number of shards as " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also change to "Target" here?

Suggested change
throw new IllegalArgumentException("Index configured with wait_for_checkpoints must search the same number of shards as " +
throw new IllegalArgumentException("Target configured with wait_for_checkpoints must search the same number of shards as " +

"checkpoints provided. [" + checkpointsProvided + "] checkpoints provided. Index [" + index + "] has " +
"[" + indexMetadata.getNumberOfShards() + "] shards.");

"checkpoints provided. [" + checkpointsProvided + "] checkpoints provided. Target [" + target + "] which resolved to " +
"index [" + index + "] has " + "[" + indexMetadata.getNumberOfShards() + "] shards.");
}
newWaitForCheckpoints.put(index, checkpoints);
}
searchRequest.setWaitForCheckpoints(Collections.unmodifiableMap(newWaitForCheckpoints));
}

private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ setup:
- match: { hits.total.value: 2 }

---
"Cannot use alias":
"Can use alias":
- do:
catch: bad_request
fleet.search:
index: "test-alias"
allow_partial_search_results: false
wait_for_checkpoints: 1
body: { query: { match_all: {} } }

- match: { _shards.successful: 1 }
- match: { hits.total.value: 2 }

---
"Must provide correct number of checkpoints":
- do:
Expand Down Expand Up @@ -102,9 +104,13 @@ setup:
body:
- {"index": "test-after-refresh", "allow_partial_search_results" : false, wait_for_checkpoints: 1}
- {query: { match_all: {} } }
- { "index": "test-alias", "allow_partial_search_results": false, wait_for_checkpoints: 1 }
- { query: { match_all: { } } }
- {"index": "test-refresh-disabled", "allow_partial_search_results": false, wait_for_checkpoints: 2}
- {query: { match_all: {} } }

- match: { responses.0._shards.successful: 1 }
- match: { responses.0.hits.total.value: 2 }
- match: { responses.1.error.caused_by.type: "illegal_argument_exception" }
- match: { responses.1._shards.successful: 1 }
- match: { responses.1.hits.total.value: 2 }
- match: { responses.2.error.caused_by.type: "illegal_argument_exception" }