diff --git a/docs/changelog/96161.yaml b/docs/changelog/96161.yaml new file mode 100644 index 0000000000000..a255a4e71083e --- /dev/null +++ b/docs/changelog/96161.yaml @@ -0,0 +1,6 @@ +pr: 96161 +summary: Skip shards when querying constant keyword fields +area: "Search" +type: enhancement +issues: + - 95541 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/SearchIdleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/SearchIdleIT.java index beed104d6b2f2..80ecf99171173 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/SearchIdleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/SearchIdleIT.java @@ -15,16 +15,23 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.refresh.RefreshStats; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import java.util.Arrays; +import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -223,4 +230,175 @@ public void testSearchIdleStats() throws InterruptedException { assertTrue(Arrays.stream(statsResponse.getShards()).allMatch(x -> x.getSearchIdleTime() >= searchIdleAfter)); } + public void testSearchIdleBoolQueryMatchOneIndex() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "routing_field") + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), "2021-05-10T00:00:00.000Z") + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2021-05-11T00:00:00.000Z") + .build(), + "doc", + "keyword", + "type=keyword", + "@timestamp", + "type=date", + "routing_field", + "type=keyword,time_series_dimension=true" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "routing_field") + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), "2021-05-12T00:00:00.000Z") + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2021-05-13T23:59:59.999Z") + .build(), + "doc", + "keyword", + "type=keyword", + "@timestamp", + "type=date", + "routing_field", + "type=keyword,time_series_dimension=true" + ); + + assertEquals( + RestStatus.CREATED, + client().prepareIndex(idleIndex) + .setSource("keyword", "idle", "@timestamp", "2021-05-10T19:00:03.765Z", "routing_field", "aaa") + .get() + .status() + ); + assertEquals( + RestStatus.CREATED, + client().prepareIndex(activeIndex) + .setSource("keyword", "active", "@timestamp", "2021-05-12T20:07:12.112Z", "routing_field", "aaa") + .get() + .status() + ); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse idleIndexStatsBefore = client().admin().indices().prepareStats("test1").get(); + assertIdleShard(idleIndexStatsBefore); + + final IndicesStatsResponse activeIndexStatsBefore = client().admin().indices().prepareStats("test2").get(); + assertIdleShard(activeIndexStatsBefore); + + // WHEN + final SearchResponse searchResponse = client().prepareSearch("test*") + .setQuery(new RangeQueryBuilder("@timestamp").from("2021-05-12T20:00:00.000Z").to("2021-05-12T21:00:00.000Z")) + .setPreFilterShardSize(5) + .get(); + + // THEN + assertEquals(RestStatus.OK, searchResponse.status()); + assertEquals(idleIndexShardsCount + activeIndexShardsCount - 1, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + Arrays.stream(searchResponse.getHits().getHits()).forEach(searchHit -> assertEquals("test2", searchHit.getIndex())); + // NOTE: we need an empty result from at least one shard + assertEquals(1, searchResponse.getHits().getHits().length); + final IndicesStatsResponse idleIndexStatsAfter = client().admin().indices().prepareStats(idleIndex).get(); + assertIdleShardsRefreshStats(idleIndexStatsBefore, idleIndexStatsAfter); + } + + public void testSearchIdleExistsQueryMatchOneIndex() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .build(), + "doc", + "keyword", + "type=keyword" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .build(), + "doc", + "keyword", + "type=keyword" + ); + + assertEquals(RestStatus.CREATED, client().prepareIndex(idleIndex).setSource("keyword", "idle").get().status()); + assertEquals( + RestStatus.CREATED, + client().prepareIndex(activeIndex).setSource("keyword", "active", "unmapped", "bbb").get().status() + ); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse idleIndexStatsBefore = client().admin().indices().prepareStats("test1").get(); + assertIdleShard(idleIndexStatsBefore); + + final IndicesStatsResponse activeIndexStatsBefore = client().admin().indices().prepareStats("test2").get(); + assertIdleShard(activeIndexStatsBefore); + + // WHEN + final SearchResponse searchResponse = client().prepareSearch("test*") + .setQuery(new ExistsQueryBuilder("unmapped")) + .setPreFilterShardSize(5) + .get(); + + // THEN + assertEquals(RestStatus.OK, searchResponse.status()); + assertEquals(idleIndexShardsCount, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + Arrays.stream(searchResponse.getHits().getHits()).forEach(searchHit -> assertEquals("test2", searchHit.getIndex())); + // NOTE: we need an empty result from at least one shard + assertEquals(1, searchResponse.getHits().getHits().length); + final IndicesStatsResponse idleIndexStatsAfter = client().admin().indices().prepareStats(idleIndex).get(); + assertIdleShardsRefreshStats(idleIndexStatsBefore, idleIndexStatsAfter); + } + + private static void assertIdleShard(final IndicesStatsResponse statsResponse) { + Arrays.stream(statsResponse.getShards()).forEach(shardStats -> assertTrue(shardStats.isSearchIdle())); + Arrays.stream(statsResponse.getShards()).forEach(shardStats -> assertTrue(shardStats.getSearchIdleTime() >= 100)); + } + + private static void assertIdleShardsRefreshStats(final IndicesStatsResponse before, final IndicesStatsResponse after) { + assertNotEquals(0, before.getShards().length); + assertNotEquals(0, after.getShards().length); + final List refreshStatsBefore = Arrays.stream(before.getShards()).map(x -> x.getStats().refresh).toList(); + final List refreshStatsAfter = Arrays.stream(after.getShards()).map(x -> x.getStats().refresh).toList(); + assertEquals(refreshStatsBefore.size(), refreshStatsAfter.size()); + assertTrue(refreshStatsAfter.containsAll(refreshStatsBefore)); + assertTrue(refreshStatsBefore.containsAll(refreshStatsAfter)); + } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 41ac48f8a8700..01e47863b6a5d 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -49,9 +50,14 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsAccounting; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.MappingParserContext; import org.elasticsearch.index.mapper.NodeMappingStats; +import org.elasticsearch.index.mapper.RuntimeField; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.query.SearchIndexNameMatcher; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; @@ -76,6 +82,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.XContentParserConfiguration; import java.io.Closeable; @@ -661,6 +668,46 @@ public SearchExecutionContext newSearchExecutionContext( ); } + /** + * Creates a new {@link QueryRewriteContext}. + * This class is used to rewrite queries in case access to the index is not required, since we can + * decide rewriting based on mappings alone. This saves the cost of pulling an index searcher as + * well as the associated cost of refreshing idle shards. + */ + public QueryRewriteContext newQueryRewriteContext( + final LongSupplier nowInMillis, + final Map runtimeMappings, + final String clusterAlias + ) { + final SearchIndexNameMatcher indexNameMatcher = new SearchIndexNameMatcher( + index().getName(), + clusterAlias, + clusterService, + expressionResolver + ); + final MapperService mapperService = mapperService(); + final MappingLookup mappingLookup = mapperService().mappingLookup(); + return new QueryRewriteContext( + parserConfiguration, + client, + nowInMillis, + mapperService, + mappingLookup, + parseRuntimeMappings(runtimeMappings, mapperService, indexSettings, mappingLookup), + null, + indexSettings, + new Index( + RemoteClusterAware.buildRemoteIndexName(clusterAlias, indexSettings.getIndex().getName()), + indexSettings.getIndex().getUUID() + ), + indexNameMatcher, + namedWriteableRegistry, + valuesSourceRegistry, + allowExpensiveQueries, + scriptService + ); + } + /** * The {@link ThreadPool} to use for this index. */ @@ -1212,4 +1259,28 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String... return clearedAtLeastOne; } + public static Map parseRuntimeMappings( + Map runtimeMappings, + MapperService mapperService, + IndexSettings indexSettings, + MappingLookup lookup + ) { + if (runtimeMappings.isEmpty()) { + return Collections.emptyMap(); + } + // TODO add specific tests to SearchExecutionTests similar to the ones in FieldTypeLookupTests + MappingParserContext parserContext = mapperService.parserContext(); + Map runtimeFields = RuntimeField.parseRuntimeFields(new HashMap<>(runtimeMappings), parserContext, false); + Map runtimeFieldTypes = RuntimeField.collectFieldTypes(runtimeFields.values()); + if (false == indexSettings.getIndexMetadata().getRoutingPaths().isEmpty()) { + for (String r : runtimeMappings.keySet()) { + if (Regex.simpleMatch(indexSettings.getIndexMetadata().getRoutingPaths(), r)) { + throw new IllegalArgumentException("runtime fields may not match [routing_path] but [" + r + "] matched"); + } + } + } + runtimeFieldTypes.keySet().forEach(lookup::validateDoesNotShadow); + return runtimeFieldTypes; + } + } diff --git a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java index 65076cf91ec1a..4ab37d9ebf9ee 100644 --- a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java @@ -45,7 +45,6 @@ import org.elasticsearch.index.mapper.MappingParserContext; import org.elasticsearch.index.mapper.NestedLookup; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.RuntimeField; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.query.support.NestedScope; @@ -76,6 +75,8 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; +import static org.elasticsearch.index.IndexService.parseRuntimeMappings; + /** * The context used to execute a search request on a shard. It provides access * to required information like mapping definitions and document data. @@ -615,13 +616,15 @@ public IndexSettings getIndexSettings() { } /** Return the current {@link IndexReader}, or {@code null} if no index reader is available, - * for instance if this rewrite context is used to index queries (percolation). */ + * for instance if this rewrite context is used to index queries (percolation). + */ public IndexReader getIndexReader() { return searcher == null ? null : searcher.getIndexReader(); } - /** Return the current {@link IndexSearcher}, or {@code null} if no index reader is available, - * for instance if this rewrite context is used to index queries (percolation). */ + /** Return the current {@link IndexSearcher}, or {@code null} if no index reader is available, which happens + * if this rewrite context is used to index queries (percolation). + */ public IndexSearcher searcher() { return searcher; } @@ -645,30 +648,6 @@ public boolean fieldExistsInIndex(String fieldname) { return fieldsInIndex.contains(fieldname); } - private static Map parseRuntimeMappings( - Map runtimeMappings, - MapperService mapperService, - IndexSettings indexSettings, - MappingLookup lookup - ) { - if (runtimeMappings.isEmpty()) { - return Collections.emptyMap(); - } - // TODO add specific tests to SearchExecutionTests similar to the ones in FieldTypeLookupTests - MappingParserContext parserContext = mapperService.parserContext(); - Map runtimeFields = RuntimeField.parseRuntimeFields(new HashMap<>(runtimeMappings), parserContext, false); - Map runtimeFieldTypes = RuntimeField.collectFieldTypes(runtimeFields.values()); - if (false == indexSettings.getIndexMetadata().getRoutingPaths().isEmpty()) { - for (String r : runtimeMappings.keySet()) { - if (Regex.simpleMatch(indexSettings.getIndexMetadata().getRoutingPaths(), r)) { - throw new IllegalArgumentException("runtime fields may not match [routing_path] but [" + r + "] matched"); - } - } - } - runtimeFieldTypes.keySet().forEach(lookup::validateDoesNotShadow); - return runtimeFieldTypes; - } - /** * Cache key for current mapping. */ diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ea248eea55a04..95ae4bb5aef22 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1544,6 +1544,9 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check readerContext = findReaderContext(request.readerId(), request); releasable = readerContext.markAsUsed(getKeepAlive(request)); indexService = readerContext.indexService(); + if (canMatchAfterRewrite(request, indexService) == false) { + return new CanMatchShardResponse(false, null); + } searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); } catch (SearchContextMissingException e) { final String searcherId = request.readerId().getSearcherId(); @@ -1551,6 +1554,9 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check throw e; } indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + if (canMatchAfterRewrite(request, indexService) == false) { + return new CanMatchShardResponse(false, null); + } IndexShard indexShard = indexService.getShard(request.shardId().getId()); final Engine.SearcherSupplier searcherSupplier = indexShard.acquireSearcherSupplier(); if (searcherId.equals(searcherSupplier.getSearcherId()) == false) { @@ -1563,6 +1569,9 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check canMatchSearcher = searcher; } else { indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + if (canMatchAfterRewrite(request, indexService) == false) { + return new CanMatchShardResponse(false, null); + } IndexShard indexShard = indexService.getShard(request.shardId().getId()); boolean needsWaitForRefresh = request.waitForCheckpoint() != UNASSIGNED_SEQ_NO; // If this request wait_for_refresh behavior, it is safest to assume a refresh is pending. Theoretically, @@ -1595,6 +1604,22 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check } } + /** + * This method tries to rewrite a query without using a {@link SearchExecutionContext}. It takes advantage of the fact that + * we can skip some shards in the query phase because we have enough information in the index mapping to decide the 'can match' + * outcome. One such example is a term based query against a constant keyword field. This queries can rewrite themselves to a + * {@link MatchNoneQueryBuilder}. This allows us to avoid extra work for example making the shard search active and waiting for + * refreshes. + */ + private static boolean canMatchAfterRewrite(final ShardSearchRequest request, final IndexService indexService) throws IOException { + final QueryRewriteContext queryRewriteContext = indexService.newQueryRewriteContext( + request::nowInMillis, + request.getRuntimeMappings(), + request.getClusterAlias() + ); + return queryStillMatchesAfterRewrite(request, queryRewriteContext); + } + @SuppressWarnings("unchecked") public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException { Rewriteable.rewrite(request.getRewriteable(), context, false); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 3b4c1de5f444f..e0578b9f7968d 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -971,7 +971,7 @@ public void testCanMatch() throws Exception { new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, AliasFilter.EMPTY, 1f, -1, null) ).canMatch() ); - assertEquals(6, numWrapInvocations.get()); + assertEquals(5, numWrapInvocations.get()); ShardSearchRequest request = new ShardSearchRequest( OriginalIndices.NONE, @@ -1027,13 +1027,13 @@ public void testCanMatch() throws Exception { CountDownLatch latch = new CountDownLatch(1); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); - assertEquals(8, numWrapInvocations.get()); + assertEquals(7, numWrapInvocations.get()); service.executeQueryPhase(request, task, new ActionListener() { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { try { // make sure that the wrapper is called when the query is actually executed - assertEquals(9, numWrapInvocations.get()); + assertEquals(8, numWrapInvocations.get()); } finally { latch.countDown(); } diff --git a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/SearchIdleTests.java b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/SearchIdleTests.java new file mode 100644 index 0000000000000..87844104b0cdd --- /dev/null +++ b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/SearchIdleTests.java @@ -0,0 +1,366 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.constantkeyword.mapper; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.MatchPhraseQueryBuilder; +import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.index.refresh.RefreshStats; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.constantkeyword.ConstantKeywordMapperPlugin; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +public class SearchIdleTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(ConstantKeywordMapperPlugin.class); + } + + public void testCanMatchAfterRewrite() throws IOException { + final String indexName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT); + final String fieldName = randomAlphaOfLength(10); + final String matchingValue = randomAlphaOfLength(10); + final String nonMatchingValue = randomAlphaOfLength(8); + final XContentBuilder mapping = JsonXContent.contentBuilder() + .startObject() + .startObject("properties") + .startObject(fieldName) + .field("type", "constant_keyword") + .field("value", matchingValue) + .endObject() + .endObject() + .endObject(); + + createIndex(indexName, Settings.EMPTY, mapping); + final SearchService service = getInstanceFromNode(SearchService.class); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService indexService = indicesService.indexServiceSafe(resolveIndex(indexName)); + final IndexShard indexShard = indexService.getShard(0); + + final SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + + searchRequest.source(new SearchSourceBuilder().query(new MatchPhraseQueryBuilder(fieldName, matchingValue))); + assertTrue(canMatch(service, indexShard, searchRequest)); + + searchRequest.source(new SearchSourceBuilder().query(new MatchPhraseQueryBuilder(fieldName, nonMatchingValue))); + assertFalse(canMatch(service, indexShard, searchRequest)); + } + + public void testSearchIdleConstantKeywordMatchNoIndex() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant_value1", + "keyword", + "type=keyword" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant_value2", + "keyword", + "type=keyword" + ); + + assertEquals(RestStatus.CREATED, client().prepareIndex(idleIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.CREATED, client().prepareIndex(activeIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse beforeStatsResponse = client().admin().indices().prepareStats("test*").get(); + assertIdleShard(beforeStatsResponse); + + // WHEN + final SearchResponse searchResponse = search("test*", "constant_keyword", randomAlphaOfLength(5), 5); + assertEquals(RestStatus.OK, searchResponse.status()); + // NOTE: we need an empty result from at least one shard + assertEquals(idleIndexShardsCount + activeIndexShardsCount - 1, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(0, searchResponse.getHits().getHits().length); + + // THEN + final IndicesStatsResponse afterStatsResponse = client().admin().indices().prepareStats("test*").get(); + + assertIdleShardsRefreshStats(beforeStatsResponse, afterStatsResponse); + } + + public void testSearchIdleConstantKeywordMatchOneIndex() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant_value1", + "keyword", + "type=keyword" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant_value2", + "keyword", + "type=keyword" + ); + + assertEquals(RestStatus.CREATED, client().prepareIndex(idleIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.CREATED, client().prepareIndex(activeIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse idleIndexStatsBefore = client().admin().indices().prepareStats("test1").get(); + assertIdleShard(idleIndexStatsBefore); + + final IndicesStatsResponse activeIndexStatsBefore = client().admin().indices().prepareStats("test2").get(); + assertIdleShard(activeIndexStatsBefore); + + // WHEN + final SearchResponse searchResponse = search("test*", "constant_keyword", "constant_value2", 5); + assertEquals(RestStatus.OK, searchResponse.status()); + assertEquals(idleIndexShardsCount, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + Arrays.stream(searchResponse.getHits().getHits()).forEach(searchHit -> assertEquals("test2", searchHit.getIndex())); + + // THEN + final IndicesStatsResponse idleIndexStatsAfter = client().admin().indices().prepareStats(idleIndex).get(); + assertIdleShardsRefreshStats(idleIndexStatsBefore, idleIndexStatsAfter); + } + + public void testSearchIdleConstantKeywordMatchTwoIndices() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant", + "keyword", + "type=keyword" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=constant", + "keyword", + "type=keyword" + ); + + assertEquals(RestStatus.CREATED, client().prepareIndex(idleIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.CREATED, client().prepareIndex(activeIndex).setSource("keyword", randomAlphaOfLength(10)).get().status()); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse beforeStatsResponse = client().admin().indices().prepareStats("test*").get(); + assertIdleShard(beforeStatsResponse); + + // WHEN + final SearchResponse searchResponse = search("test*", "constant_keyword", "constant", 5); + + // THEN + assertEquals(RestStatus.OK, searchResponse.status()); + assertEquals(0, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + assertArrayEquals( + new String[] { "test1", "test2" }, + Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getIndex).sorted().toArray() + ); + final IndicesStatsResponse afterStatsResponse = client().admin().indices().prepareStats("test*").get(); + assertIdleShardsRefreshStats(beforeStatsResponse, afterStatsResponse); + } + + public void testSearchIdleWildcardQueryMatchOneIndex() throws InterruptedException { + // GIVEN + final String idleIndex = "test1"; + final String activeIndex = "test2"; + // NOTE: we need many shards because shard pre-filtering and the "can match" phase + // are executed only if we have enough shards. + int idleIndexShardsCount = 3; + int activeIndexShardsCount = 3; + createIndex( + idleIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, idleIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=test1_value" + ); + createIndex( + activeIndex, + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), "500ms") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, activeIndexShardsCount) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build(), + "doc", + "constant_keyword", + "type=constant_keyword,value=test2_value" + ); + + assertEquals(RestStatus.CREATED, client().prepareIndex(idleIndex).setSource("keyword", "value").get().status()); + assertEquals(RestStatus.CREATED, client().prepareIndex(activeIndex).setSource("keyword", "value").get().status()); + assertEquals(RestStatus.OK, client().admin().indices().prepareRefresh(idleIndex, activeIndex).get().getStatus()); + + waitUntil( + () -> Arrays.stream(client().admin().indices().prepareStats(idleIndex, activeIndex).get().getShards()) + .allMatch(ShardStats::isSearchIdle), + 2, + TimeUnit.SECONDS + ); + + final IndicesStatsResponse idleIndexStatsBefore = client().admin().indices().prepareStats("test1").get(); + assertIdleShard(idleIndexStatsBefore); + + final IndicesStatsResponse activeIndexStatsBefore = client().admin().indices().prepareStats("test2").get(); + assertIdleShard(activeIndexStatsBefore); + + // WHEN + final SearchResponse searchResponse = client().prepareSearch("test*") + .setQuery(new WildcardQueryBuilder("constant_keyword", "test2*")) + .setPreFilterShardSize(5) + .get(); + + // THEN + assertEquals(RestStatus.OK, searchResponse.status()); + assertEquals(idleIndexShardsCount, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getFailedShards()); + Arrays.stream(searchResponse.getHits().getHits()).forEach(searchHit -> assertEquals("test2", searchHit.getIndex())); + final IndicesStatsResponse idleIndexStatsAfter = client().admin().indices().prepareStats(idleIndex).get(); + assertIdleShardsRefreshStats(idleIndexStatsBefore, idleIndexStatsAfter); + } + + private SearchResponse search(final String index, final String field, final String value, int preFilterShardSize) { + return client().prepareSearch(index) + .setQuery(new MatchPhraseQueryBuilder(field, value)) + .setPreFilterShardSize(preFilterShardSize) + .get(); + } + + private static void assertIdleShard(final IndicesStatsResponse statsResponse) { + Arrays.stream(statsResponse.getShards()).forEach(shardStats -> assertTrue(shardStats.isSearchIdle())); + Arrays.stream(statsResponse.getShards()).forEach(shardStats -> assertTrue(shardStats.getSearchIdleTime() >= 100)); + } + + private static void assertIdleShardsRefreshStats(final IndicesStatsResponse before, final IndicesStatsResponse after) { + assertNotEquals(0, before.getShards().length); + assertNotEquals(0, after.getShards().length); + final List refreshStatsBefore = Arrays.stream(before.getShards()) + .map(shardStats -> shardStats.getStats().refresh) + .toList(); + final List refreshStatsAfter = Arrays.stream(after.getShards()) + .map(shardStats -> shardStats.getStats().refresh) + .toList(); + assertEquals(refreshStatsBefore.size(), refreshStatsAfter.size()); + assertTrue(refreshStatsAfter.containsAll(refreshStatsBefore)); + assertTrue(refreshStatsBefore.containsAll(refreshStatsAfter)); + } + + private static boolean canMatch(final SearchService service, final IndexShard indexShard, final SearchRequest request) + throws IOException { + return service.canMatch( + new ShardSearchRequest(OriginalIndices.NONE, request, indexShard.shardId(), 0, 1, AliasFilter.EMPTY, 1f, -1, null) + ).canMatch(); + } +}