Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;

private final String localClusterAlias;
private final long absoluteStartMillis;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}

/**
Expand All @@ -115,6 +119,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
}

/**
Expand All @@ -138,12 +143,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
}

/**
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
*/
SearchRequest(String localClusterAlias) {
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
this.absoluteStartMillis = absoluteStartMillis;
}

/**
Expand All @@ -155,10 +165,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
indices = in.readStringArray();
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
Expand All @@ -175,19 +182,22 @@ public SearchRequest(StreamInput in) throws IOException {
//TODO update version after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
localClusterAlias = in.readOptionalString();
if (localClusterAlias != null) {
absoluteStartMillis = in.readVLong();
} else {
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
} else {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
Expand All @@ -204,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException {
//TODO update version after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(localClusterAlias);
if (localClusterAlias != null) {
out.writeVLong(absoluteStartMillis);
}
}
}

Expand Down Expand Up @@ -243,6 +256,17 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. A non-null
* value (expected to be greater or equal than 0) indicates that this search request is being executed as part of a locally reduced
* cross-cluster search request. The provided current time is used to ensure that the same value, determined by the CCS coordinating
* node, is used on all clusters involved in the execution of the search request.
*/
@Nullable
Long getAbsoluteStartMillis() {
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? null : absoluteStartMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is called only once it could be called getOrCreateAbsoluteStartMillis() and return System.currentTimeMillis() if the absoluteStartMillis is not set (equals to `-1) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I will do that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could even use currentTimeMillis as a default in the constructor considering when the search request gets created on the coord node, if it wasn't for the transport client, in which case that would take the current time on the client instead of on the server.. Too bad :)

}

/**
* Sets the indices the search will be executed on.
*/
Expand Down Expand Up @@ -435,7 +459,6 @@ public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}


/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
Expand Down Expand Up @@ -498,13 +521,6 @@ public int getPreFilterShardSize() {
return preFilterShardSize;
}

/**
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
*/
boolean isMaxConcurrentShardRequestsSet() {
return maxConcurrentShardRequests != 0;
}

/**
* @return true if the request only has suggest
*/
Expand Down Expand Up @@ -538,7 +554,7 @@ public String getDescription() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

Expand All @@ -564,14 +580,15 @@ public boolean equals(Object o) {
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias);
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis;
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
}

@Override
Expand All @@ -590,6 +607,7 @@ public String toString() {
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", localClusterAlias=" + localClusterAlias +
", absoluteStartMillis=" + absoluteStartMillis +
", source=" + source + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ long getRelativeCurrentNanos() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
final long absoluteStartMillis = System.currentTimeMillis();
final long absoluteStartMillis = searchRequest.getAbsoluteStartMillis() != null ?
searchRequest.getAbsoluteStartMillis() : System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ protected SearchRequest createSearchRequest() throws IOException {
if (randomBoolean()) {
return super.createSearchRequest();
}
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}

public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
assertNull(searchRequest.validate());
}

public void testSerialization() throws Exception {
SearchRequest searchRequest = createSearchRequest();
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
Expand All @@ -69,18 +76,22 @@ public void testClusterAliasSerialization() throws IOException {
//TODO update version after backport
if (version.before(Version.V_7_0_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertNull(deserializedRequest.getAbsoluteStartMillis());
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis());
}
}

//TODO rename and update version after backport
public void testReadFromPre7_0_0() throws IOException {
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
SearchRequest searchRequest = new SearchRequest(in);
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertNull(searchRequest.getAbsoluteStartMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;

public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {

public void testLocalClusterAlias() {
long nowInMillis = System.currentTimeMillis();
IndexRequest indexRequest = new IndexRequest("test");
indexRequest.id("1");
indexRequest.source("field", "value");
Expand All @@ -37,7 +41,7 @@ public void testLocalClusterAlias() {
assertEquals(RestStatus.CREATED, indexResponse.status());

{
SearchRequest searchRequest = new SearchRequest("local");
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -48,7 +52,7 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("");
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -59,4 +63,58 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
}

public void testAbsoluteStartMillis() {
{
IndexRequest indexRequest = new IndexRequest("test-1970.01.01");
indexRequest.id("1");
indexRequest.source("date", "1970-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
IndexRequest indexRequest = new IndexRequest("test-1982.01.01");
indexRequest.id("1");
indexRequest.source("date", "1982-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
SearchRequest searchRequest = new SearchRequest();
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("<test-{now/d}>");
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");
rangeQuery.lt("1982-01-01");
sourceBuilder.query(rangeQuery);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
}
}