Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ static Request search(SearchRequest searchRequest) throws IOException {
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}
if (searchRequest.allowPartialSearchResults() != null) {
params.putParam("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
}
params.putParam("batched_reduce_size", Integer.toString(searchRequest.getBatchedReduceSize()));
if (searchRequest.scroll() != null) {
params.putParam("scroll", searchRequest.scroll().keepAlive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ public void testSearch() throws Exception {
searchRequest.requestCache(randomBoolean());
expectedParams.put("request_cache", Boolean.toString(searchRequest.requestCache()));
}
if (randomBoolean()) {
searchRequest.allowPartialSearchResults(randomBoolean());
expectedParams.put("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
}
if (randomBoolean()) {
searchRequest.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE));
}
Expand Down
12 changes: 9 additions & 3 deletions docs/reference/search/request-body.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ And here is a sample response:
aggregations and suggestions (no top hits returned).
See <<shard-request-cache>>.

`allow_partial_search_results`::

Set to `false` to return an overall failure if the request would produce partial
results. Defaults to true, which will allow partial results in the case of timeouts
or partial failures.

`terminate_after`::

The maximum number of documents to collect for each shard,
Expand All @@ -103,9 +109,9 @@ And here is a sample response:



Out of the above, the `search_type` and the `request_cache` must be passed as
query-string parameters. The rest of the search request should be passed
within the body itself. The body content can also be passed as a REST
Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`
settings must be passed as query-string parameters. The rest of the search request should
be passed within the body itself. The body content can also be passed as a REST
parameter named `source`.

Both HTTP GET and HTTP POST can be used to execute search with body. Since not
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/search/uri-request.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ Defaults to no terminate_after.
Defaults to `query_then_fetch`. See
<<search-request-search-type,_Search Type_>> for
more details on the different types of search that can be performed.

|`allow_partial_search_results` |Set to `false` to return an overall failure if the request would produce
partial results. Defaults to true, which will allow partial results in the case of timeouts
or partial failures..
|=======================================================================
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
.requestCache(request.requestCache())
.scroll(request.scroll())
.indicesOptions(request.indicesOptions());
if (request.allowPartialSearchResults() != null) {
slices[slice].allowPartialSearchResults(request.allowPartialSearchResults());
}
}
return slices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@
"type" : "boolean",
"description": "Indicate if the number of documents that match the query should be tracked"
},
"allow_partial_search_results": {
"type" : "boolean",
"default" : true,
"description": "Indicate if an error should be returned if there is a partial search failure or timeout"
},
"typed_keys": {
"type" : "boolean",
"description" : "Specify whether aggregation and suggester names should be prefixed by their respective types in the response"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,27 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
}
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()),
cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
}
executePhase(nextPhase);
}
}

Expand Down Expand Up @@ -265,8 +279,16 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters);
skippedOps.get(), buildTookInMillis(), failures, clusters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private SearchRequest buildExpandSearchRequest(SearchRequest orig, SearchSourceB
.preference(orig.preference())
.routing(orig.routing())
.searchType(orig.searchType());
if (orig.allowPartialSearchResults() != null){
groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults());
}
if (orig.isMaxConcurrentShardRequestsSet()) {
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,27 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() >0) {
//Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["+ missingShards +
"]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ public static void readMultiLineFormat(BytesReference data,
searchRequest.preference(nodeStringValue(value, null));
} else if ("routing".equals(entry.getKey())) {
searchRequest.routing(nodeStringValue(value, null));
} else if ("allow_partial_search_results".equals(entry.getKey())) {
searchRequest.allowPartialSearchResults(nodeBooleanValue(value, null));
}
}
defaultOptions = IndicesOptions.fromMap(source, defaultOptions);
Expand Down Expand Up @@ -297,6 +299,9 @@ public static byte[] writeMultiLineFormat(MultiSearchRequest multiSearchRequest,
if (request.routing() != null) {
xContentBuilder.field("routing", request.routing());
}
if (request.allowPartialSearchResults() != null) {
xContentBuilder.field("allow_partial_search_results", request.allowPartialSearchResults());
}
xContentBuilder.endObject();
xContentBuilder.bytes().writeTo(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private Boolean requestCache;

private Boolean allowPartialSearchResults;


private Scroll scroll;

private int batchedReduceSize = 512;
Expand Down Expand Up @@ -113,6 +116,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
this.source = source;
}


@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -305,6 +309,20 @@ public SearchRequest requestCache(Boolean requestCache) {
public Boolean requestCache() {
return this.requestCache;
}

/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
*/
public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults) {
this.allowPartialSearchResults = allowPartialSearchResults;
return this;
}

public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}


/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
Expand Down Expand Up @@ -427,6 +445,9 @@ public void readFrom(StreamInput in) throws IOException {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
}

@Override
Expand All @@ -449,6 +470,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
}

@Override
Expand All @@ -471,13 +495,15 @@ public boolean equals(Object o) {
Objects.equals(batchedReduceSize, that.batchedReduceSize) &&
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions);
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize);
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults);
}

@Override
Expand All @@ -494,6 +520,7 @@ public String toString() {
", maxConcurrentShardRequests=" + maxConcurrentShardRequests +
", batchedReduceSize=" + batchedReduceSize +
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", source=" + source + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,16 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) {
request.requestCache(requestCache);
return this;
}


/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
*/
public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) {
request.allowPartialSearchResults(allowPartialSearchResults);
return this;
}

/**
* Should the query be profiled. Defaults to <code>false</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}

if (request.hasParam("allow_partial_search_results")) {
// only set if we have the parameter passed to override the cluster-level default
searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
}

// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final TimeValue NO_TIMEOUT = timeValueMillis(-1);
public static final Setting<TimeValue> DEFAULT_SEARCH_TIMEOUT_SETTING =
Setting.timeSetting("search.default_search_timeout", NO_TIMEOUT, Property.Dynamic, Property.NodeScope);
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this being node scope is that we could potentially have different values on different nodes (if the user sets this in elasticsearch.yml) which could cause unexpected behaviour. The same is actually already true for the default timeout above but I wonder if we should raise this in an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, see my comment against the documentation about whether we should make this default to false in this PR to make the backport straightforward and then change the default in a follow up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure there is a "Property.ClusterScope"? All cluster-level settings I see (e.g. RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING ) come defined with Property.NodeScope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO unfortunately there is no Property.ClusterScope so I think it makes it more important to have the setting resolved on the coordinating node so that all shards use the same value.



private final ThreadPool threadPool;
Expand All @@ -159,6 +161,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile TimeValue defaultSearchTimeout;

private volatile boolean defaultAllowPartialSearchResults;

private volatile boolean lowLevelCancellation;

private final Cancellable keepAliveReaper;
Expand Down Expand Up @@ -194,6 +198,11 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);

defaultAllowPartialSearchResults = DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
this::setDefaultAllowPartialSearchResults);


lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}
Expand All @@ -216,6 +225,14 @@ private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
this.defaultSearchTimeout = defaultSearchTimeout;
}

private void setDefaultAllowPartialSearchResults(boolean defaultAllowPartialSearchResults) {
this.defaultAllowPartialSearchResults = defaultAllowPartialSearchResults;
}

public boolean defaultAllowPartialSearchResults() {
return defaultAllowPartialSearchResults;
}

private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}
Expand Down
Loading