Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
terms:
field: f1.keyword

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand Down Expand Up @@ -63,6 +64,7 @@
terms:
field: f1.keyword

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand All @@ -83,6 +85,7 @@
terms:
field: f1.keyword

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -103,6 +106,7 @@
terms:
field: f1.keyword

- is_false: num_reduce_phases
- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
Expand Down Expand Up @@ -133,6 +137,7 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand Down Expand Up @@ -162,6 +167,7 @@
rest_total_hits_as_int: true
index: "*:test_index"

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
Expand All @@ -176,6 +182,7 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -192,6 +199,7 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -208,6 +216,7 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
query:
match_all: {}

- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
Expand All @@ -28,6 +29,7 @@
rest_total_hits_as_int: true
body: { "scroll_id": "$scroll_id", "scroll": "1m"}

- is_false: num_reduce_phases
- is_false: _clusters
- match: {hits.total: 6 }
- length: {hits.hits: 2 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,20 +714,18 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final boolean finalReduce = request.getLocalClusterAlias() == null;

if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, finalReduce);
trackTotalHitsUpTo, request.isFinalReduce());
}
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, finalReduce);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private final String localClusterAlias;
private final long absoluteStartMillis;
private final boolean finalReduce;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
}

/**
* Constructs a new search request from the provided search request
*/
public SearchRequest(SearchRequest searchRequest) {
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis);
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias,
searchRequest.absoluteStartMillis, searchRequest.finalReduce);
}

/**
Expand All @@ -132,25 +135,30 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
}

/**
* Creates a new search request by providing the search request to copy all fields from, the indices to search against,
* the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
* Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
* on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
* alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
* to ensure that the same value is used.
* Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of
* the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction
* should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request
* performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters.
*
* @param originalSearchRequest the original search request
* @param indices the indices to search against
* @param localClusterAlias the alias to prefix index names with in the returned search results
* @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
* @param finalReduce whether the reduction should be final or not
*/
static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices,
String localClusterAlias, long absoluteStartMillis) {
String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
Objects.requireNonNull(originalSearchRequest, "search request must not be null");
validateIndices(indices);
Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis);
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis, finalReduce);
}

private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) {
private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis,
boolean finalReduce) {
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
this.batchedReduceSize = searchRequest.batchedReduceSize;
this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips;
Expand All @@ -167,6 +175,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
this.types = searchRequest.types;
this.localClusterAlias = localClusterAlias;
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
}

/**
Expand Down Expand Up @@ -203,6 +212,12 @@ public SearchRequest(StreamInput in) throws IOException {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
//TODO move to the 6_7_0 branch once backported to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
finalReduce = in.readBoolean();
} else {
finalReduce = true;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ccsMinimizeRoundtrips = in.readBoolean();
}
Expand Down Expand Up @@ -232,6 +247,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(absoluteStartMillis);
}
}
//TODO move to the 6_7_0 branch once backported to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(finalReduce);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(ccsMinimizeRoundtrips);
}
Expand Down Expand Up @@ -277,11 +296,18 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns whether the reduction phase that will be performed needs to be final or not.
*/
boolean isFinalReduce() {
return finalReduce;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
* current time, otherwise it will return {@link System#currentTimeMillis()}.
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean)}, this method returns
* the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
*
*/
long getOrCreateAbsoluteStartMillis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.search;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
Expand All @@ -35,8 +36,10 @@
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
Expand All @@ -47,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
Expand Down Expand Up @@ -497,4 +501,12 @@ public String toString() {
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
//if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
//we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
return SearchResponse.empty(searchTimeProvider::buildTookInMillis, clusters);
}
int totalShards = 0;
int skippedShards = 0;
Expand Down
Loading