From f3df41e148566b376f0e16aab77724ccd5777056 Mon Sep 17 00:00:00 2001 From: Mark Tozzi Date: Thu, 6 Feb 2025 11:15:01 -0500 Subject: [PATCH] Aggregations cancellation after collection (#120944) This PR addresses issues around aggregations cancellation, mentioned in https://github.com/elastic/elasticsearch/issues/108701 and other places. In brief, during aggregations collection time, we respect cancellation via the mechanisms in the searcher to poison cancelled queries. But once the aggregation finishes collection, there is no further need to interact with the searcher, so we cannot rely on that for cancellation checking. In particular, deeply nested aggregations can spend a long time constructing the results tree. Checking for cancellation is a trade off, as the check itself is somewhat expensive (it involves a volatile read), so we want to balance checking often enough that cancelled queries aren't taking up resources for a long time, but not so frequently that it slows down most aggregation queries. Our first attempt to this is to check once when we go to build sub-aggregations, as the worst cases for this that we've seen involve needing to build deep sub-aggregation trees. Checking at sub-aggregation construction time also provides a conveniently centralized method call to add the check to. --------- Co-authored-by: elasticsearchmachine Co-authored-by: Nik Everett --- docs/changelog/120944.yaml | 6 + .../AutoDateHistogramAggregator.java | 11 +- .../metric/MatrixStatsAggregatorTests.java | 6 +- .../bucket/BucketsAggregator.java | 5 + .../aggregations/AggregatorTestCase.java | 183 +++++++++-- x-pack/plugin/analytics/build.gradle | 5 + .../multiterms/AggsTimeoutIT.java | 300 ++++++++++++++++++ 7 files changed, 478 insertions(+), 38 deletions(-) create mode 100644 docs/changelog/120944.yaml create mode 100644 x-pack/plugin/analytics/src/javaRestTest/java/org/elasticsearch/multiterms/AggsTimeoutIT.java diff --git a/docs/changelog/120944.yaml b/docs/changelog/120944.yaml new file mode 100644 index 0000000000000..6f9df3f29393e --- /dev/null +++ b/docs/changelog/120944.yaml @@ -0,0 +1,6 @@ +pr: 120944 +summary: Aggregations cancellation after collection +area: Aggregations +type: bug +issues: + - 108701 diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 6add1b0ac4a13..abd482d8298ef 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.tasks.TaskCancelledException; import java.io.IOException; import java.util.Collections; @@ -573,7 +574,15 @@ private void rebucket() { long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; bucketOrds = new LongKeyedBucketOrds.FromMany(bigArrays()); success = true; - for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) { + long maxOwning = oldOrds.maxOwningBucketOrd(); + for (long owningBucketOrd = 0; owningBucketOrd <= maxOwning; owningBucketOrd++) { + /* + * Check for cancelation during this tight loop as it can take a while and the standard + * cancelation checks don't run during the loop. Becuase it's a tight loop. + */ + if (context.isCancelled()) { + throw new TaskCancelledException("cancelled"); + } LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd); Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)]; while (ordsEnum.next()) { diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/metric/MatrixStatsAggregatorTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/metric/MatrixStatsAggregatorTests.java index 74c1f3c16278f..2eb21cfc09650 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/metric/MatrixStatsAggregatorTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/metric/MatrixStatsAggregatorTests.java @@ -36,7 +36,7 @@ public void testNoData() throws Exception { MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields( Collections.singletonList("field") ); - InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft)); + InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft).noReductionCancellation()); assertNull(stats.getStats()); assertEquals(0L, stats.getDocCount()); } @@ -54,7 +54,7 @@ public void testUnmapped() throws Exception { MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields( Collections.singletonList("bogus") ); - InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft)); + InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ft).noReductionCancellation()); assertNull(stats.getStats()); assertEquals(0L, stats.getDocCount()); } @@ -88,7 +88,7 @@ public void testTwoFields() throws Exception { MatrixStatsAggregationBuilder aggBuilder = new MatrixStatsAggregationBuilder("my_agg").fields( Arrays.asList(fieldA, fieldB) ); - InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ftA, ftB)); + InternalMatrixStats stats = searchAndReduce(reader, new AggTestConfig(aggBuilder, ftA, ftB).noReductionCancellation()); multiPassStats.assertNearlyEqual(stats); assertTrue(MatrixAggregationInspectionHelper.hasValue(stats)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 592f7b4887598..e85d01930807c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskCancelledException; import java.io.IOException; import java.util.AbstractList; @@ -163,6 +164,10 @@ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {} * array of ordinals */ protected final IntFunction buildSubAggsForBuckets(LongArray bucketOrdsToCollect) throws IOException { + if (context.isCancelled()) { + throw new TaskCancelledException("not building sub-aggregations due to task cancellation"); + } + prepareSubAggs(bucketOrdsToCollect); InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][]; for (int i = 0; i < subAggregators.length; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 9e2dee4d94212..d034e6e6679c1 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -129,6 +129,7 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation; @@ -149,6 +150,7 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.internal.SubSearchContext; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -251,29 +253,12 @@ protected List getSearchPlugins() { return List.of(); } - /** - * Deprecated - this will be made private in a future update - */ - @Deprecated - protected A createAggregator( - AggregationBuilder aggregationBuilder, - IndexReader indexReader, - MappedFieldType... fieldTypes - ) throws IOException { - return createAggregator(aggregationBuilder, createAggregationContext(indexReader, new MatchAllDocsQuery(), fieldTypes)); - } - protected A createAggregator(AggregationBuilder aggregationBuilder, AggregationContext context) throws IOException { return createAggregator(new AggregatorFactories.Builder().addAggregator(aggregationBuilder), context); } - /** - * Deprecated - this will be made private in a future update - */ - @Deprecated - protected A createAggregator(AggregatorFactories.Builder builder, AggregationContext context) - throws IOException { + private A createAggregator(AggregatorFactories.Builder builder, AggregationContext context) throws IOException { Aggregator[] aggregators = builder.build(context, null).createTopLevelAggregators(); assertThat(aggregators.length, equalTo(1)); @SuppressWarnings("unchecked") @@ -310,10 +295,7 @@ protected AggregationContext createAggregationContext(IndexReader indexReader, Q * While {@linkplain AggregationContext} is {@link Releasable} the caller is * not responsible for releasing it. Instead, it is released automatically in * in {@link #cleanupReleasables()}. - * - * Deprecated - this will be made private in a future update */ - @Deprecated protected AggregationContext createAggregationContext( IndexReader indexReader, IndexSettings indexSettings, @@ -346,6 +328,56 @@ private AggregationContext createAggregationContext( int maxBucket, boolean isInSortOrderExecutionRequired, MappedFieldType... fieldTypes + ) { + return createAggregationContext( + searcher, + indexSettings, + query, + breakerService, + bytesToPreallocate, + maxBucket, + isInSortOrderExecutionRequired, + () -> false, + fieldTypes + ); + } + + /** + * Creates an aggregation context that will randomly report that the query has been cancelled + */ + private AggregationContext createCancellingAggregationContext( + IndexSearcher searcher, + IndexSettings indexSettings, + Query query, + CircuitBreakerService breakerService, + long bytesToPreallocate, + int maxBucket, + boolean isInSortOrderExecutionRequired, + MappedFieldType... fieldTypes + ) { + return createAggregationContext( + searcher, + indexSettings, + query, + breakerService, + bytesToPreallocate, + maxBucket, + isInSortOrderExecutionRequired, + () -> ESTestCase.random().nextInt(20) == 0, + fieldTypes + ); + } + + private AggregationContext createAggregationContext( + IndexSearcher searcher, + IndexSettings indexSettings, + Query query, + CircuitBreakerService breakerService, + long bytesToPreallocate, + int maxBucket, + boolean isInSortOrderExecutionRequired, + Supplier isCancelled, + MappedFieldType... fieldTypes ) { MappingLookup mappingLookup = MappingLookup.fromMappers( Mapping.EMPTY, @@ -409,7 +441,7 @@ public Iterable dimensionFields() { bitsetFilterCache, randomInt(), () -> 0L, - () -> false, + isCancelled, q -> q, true, isInSortOrderExecutionRequired @@ -536,9 +568,11 @@ protected A searchAndReduce(IndexReader reader, IndexSettings indexSettings = createIndexSettings(); // First run it to find circuit breaker leaks on the aggregator runWithCrankyCircuitBreaker(indexSettings, searcher, aggTestConfig); - // Second run it to the end CircuitBreakerService breakerService = new NoneCircuitBreakerService(); - return searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig); + // Next, try with random cancellations, again looking for leaks + runWithCancellingConfig(indexSettings, searcher, breakerService, aggTestConfig); + // Finally, run it to the end + return searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig, this::createAggregationContext); } /** @@ -552,7 +586,7 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc CircuitBreakerService crankyService = new CrankyCircuitBreakerService(); for (int i = 0; i < 5; i++) { try { - searchAndReduce(indexSettings, searcher, crankyService, aggTestConfig); + searchAndReduce(indexSettings, searcher, crankyService, aggTestConfig, this::createAggregationContext); } catch (CircuitBreakingException e) { // Circuit breaks from the cranky breaker are expected - it randomly fails, after all assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); @@ -560,12 +594,43 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc } } + private void runWithCancellingConfig( + IndexSettings indexSettings, + IndexSearcher searcher, + CircuitBreakerService breakerService, + AggTestConfig aggTestConfig + ) throws IOException { + for (int i = 0; i < 5; i++) { + try { + searchAndReduce(indexSettings, searcher, breakerService, aggTestConfig, this::createCancellingAggregationContext); + } catch (TaskCancelledException e) { + // we don't want to expectThrows this because the randomizer might just never report cancellation, + // but it's also normal that it should throw here. + } + } + } + + @FunctionalInterface + public interface AggregationcContextSupplier { + AggregationContext get( + IndexSearcher searcher, + IndexSettings indexSettings, + Query query, + CircuitBreakerService breakerService, + long bytesToPreallocate, + int maxBucket, + boolean isInSortOrderExecutionRequired, + MappedFieldType... fieldTypes + ); + } + @SuppressWarnings("unchecked") private A searchAndReduce( IndexSettings indexSettings, IndexSearcher searcher, CircuitBreakerService breakerService, - AggTestConfig aggTestConfig + AggTestConfig aggTestConfig, + AggregationcContextSupplier contextSupplier ) throws IOException { Query query = aggTestConfig.query(); AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(aggTestConfig.builder()); @@ -591,7 +656,7 @@ private A searchAndReduce( subSearchers[searcherIDX] = new ShardSearcher(leave, compCTX); } for (ShardSearcher subSearcher : subSearchers) { - AggregationContext context = createAggregationContext( + AggregationContext context = contextSupplier.get( subSearcher, indexSettings, query, @@ -620,7 +685,7 @@ private A searchAndReduce( } } } else { - AggregationContext context = createAggregationContext( + AggregationContext context = contextSupplier.get( searcher, indexSettings, query, @@ -688,8 +753,41 @@ private A searchAndReduce( assertRoundTrip(internalAggregation.copyResults()); } } + /* Verify that cancellation during final reduce correctly throws. + * We check reduce time cancellation only when consuming buckets. + */ + if (aggTestConfig.testReductionCancellation()) { + try { + // I can't remember if we mutate the InternalAggregations list, so make a defensive copy + List internalAggsCopy = new ArrayList<>(internalAggs); + A internalAgg = doFinalReduce(maxBucket, bigArraysForReduction, builder, internalAggsCopy, true); + if (internalAgg instanceof MultiBucketsAggregation mb) { + // Empty mutli-bucket aggs are expected to return before even getting to the cancellation check + assertEquals("Got non-empty result for a cancelled reduction", 0, mb.getBuckets().size()); + } // other cases? + } catch (TaskCancelledException e) { + /* We may not always honor cancellation in reduce, for example if we are returning no results, so we can't + * just expectThrows here. + */ + } + } // now do the final reduce + A internalAgg = doFinalReduce(maxBucket, bigArraysForReduction, builder, internalAggs, false); + assertRoundTrip(internalAgg); + if (aggTestConfig.builder instanceof ValuesSourceAggregationBuilder.MetricsAggregationBuilder) { + verifyMetricNames((ValuesSourceAggregationBuilder.MetricsAggregationBuilder) aggTestConfig.builder, internalAgg); + } + return internalAgg; + } + + private A doFinalReduce( + int maxBucket, + BigArrays bigArraysForReduction, + Builder builder, + List internalAggs, + boolean cancelled + ) throws IOException { MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( maxBucket, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) @@ -697,7 +795,7 @@ private A searchAndReduce( AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal( bigArraysForReduction, getMockScriptService(), - () -> false, + () -> cancelled, builder, reduceBucketConsumer ); @@ -707,10 +805,6 @@ private A searchAndReduce( assertRoundTrip(internalAgg); doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); - assertRoundTrip(internalAgg); - if (aggTestConfig.builder instanceof ValuesSourceAggregationBuilder.MetricsAggregationBuilder) { - verifyMetricNames((ValuesSourceAggregationBuilder.MetricsAggregationBuilder) aggTestConfig.builder, internalAgg); - } return internalAgg; } @@ -1601,11 +1695,12 @@ public record AggTestConfig( boolean incrementalReduce, boolean useLogDocMergePolicy, + boolean testReductionCancellation, MappedFieldType... fieldTypes ) { public AggTestConfig(AggregationBuilder builder, MappedFieldType... fieldTypes) { - this(new MatchAllDocsQuery(), builder, DEFAULT_MAX_BUCKETS, randomBoolean(), true, randomBoolean(), false, fieldTypes); + this(new MatchAllDocsQuery(), builder, DEFAULT_MAX_BUCKETS, randomBoolean(), true, randomBoolean(), false, true, fieldTypes); } public AggTestConfig withQuery(Query query) { @@ -1617,6 +1712,7 @@ public AggTestConfig withQuery(Query query) { shouldBeCached, incrementalReduce, useLogDocMergePolicy, + testReductionCancellation, fieldTypes ); } @@ -1630,6 +1726,7 @@ public AggTestConfig withSplitLeavesIntoSeperateAggregators(boolean splitLeavesI shouldBeCached, incrementalReduce, useLogDocMergePolicy, + testReductionCancellation, fieldTypes ); } @@ -1643,6 +1740,7 @@ public AggTestConfig withShouldBeCached(boolean shouldBeCached) { shouldBeCached, incrementalReduce, useLogDocMergePolicy, + testReductionCancellation, fieldTypes ); } @@ -1656,6 +1754,7 @@ public AggTestConfig withMaxBuckets(int maxBuckets) { shouldBeCached, incrementalReduce, useLogDocMergePolicy, + testReductionCancellation, fieldTypes ); } @@ -1669,6 +1768,7 @@ public AggTestConfig withIncrementalReduce(boolean incrementalReduce) { shouldBeCached, incrementalReduce, useLogDocMergePolicy, + testReductionCancellation, fieldTypes ); } @@ -1682,6 +1782,21 @@ public AggTestConfig withLogDocMergePolicy() { shouldBeCached, incrementalReduce, true, + testReductionCancellation, + fieldTypes + ); + } + + public AggTestConfig noReductionCancellation() { + return new AggTestConfig( + query, + builder, + maxBuckets, + splitLeavesIntoSeparateAggregators, + shouldBeCached, + incrementalReduce, + useLogDocMergePolicy, + false, fieldTypes ); } diff --git a/x-pack/plugin/analytics/build.gradle b/x-pack/plugin/analytics/build.gradle index 9a21f40a4c4a9..7aaaaaf668643 100644 --- a/x-pack/plugin/analytics/build.gradle +++ b/x-pack/plugin/analytics/build.gradle @@ -7,6 +7,7 @@ apply plugin: 'elasticsearch.internal-es-plugin' apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-java-rest-test' esplugin { name = 'x-pack-analytics' @@ -18,6 +19,10 @@ base { archivesName = 'x-pack-analytics' } +tasks.named('javaRestTest') { + usesDefaultDistribution() +} + dependencies { api 'org.apache.commons:commons-math3:3.6.1' compileOnly project(path: xpackModule('core')) diff --git a/x-pack/plugin/analytics/src/javaRestTest/java/org/elasticsearch/multiterms/AggsTimeoutIT.java b/x-pack/plugin/analytics/src/javaRestTest/java/org/elasticsearch/multiterms/AggsTimeoutIT.java new file mode 100644 index 0000000000000..b3f32f0567b92 --- /dev/null +++ b/x-pack/plugin/analytics/src/javaRestTest/java/org/elasticsearch/multiterms/AggsTimeoutIT.java @@ -0,0 +1,300 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.multiterms; + +import org.apache.http.client.config.RequestConfig; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.test.ListMatcher; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +/** + * Runs slow aggregations with a timeout and asserts that they timeout and + * cancel the queries. + */ +public class AggsTimeoutIT extends ESRestTestCase { + private static final int DEPTH = 10; + private static final int VALUE_COUNT = 4; + private static final int TOTAL_DOCS = Math.toIntExact((long) Math.pow(VALUE_COUNT, DEPTH)); + private static final TimeValue TIMEOUT = TimeValue.timeValueSeconds(1); + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .setting("xpack.watcher.enabled", "false") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.enabled", "false") + .setting("xpack.security.transport.ssl.enabled", "false") + .setting("xpack.security.http.ssl.enabled", "false") + .jvmArg("-Xmx1g") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + public void testTerms() throws Exception { + Request request = new Request("POST", "/deep/_search"); + XContentBuilder body = JsonXContent.contentBuilder().prettyPrint().startObject(); + body.field("size", 0); + agg(body, "terms", 10); + request.setJsonEntity(Strings.toString(body.endObject())); + setTimeout(request); + try { + Map response = responseAsMap(client().performRequest(request)); + assertMap("not expected to finish", response, matchesMap()); + } catch (SocketTimeoutException timeout) { + logger.info("timed out"); + assertNoSearchesRunning(); + } + } + + private void agg(XContentBuilder body, String type, int depth) throws IOException { + if (depth == 0) { + return; + } + body.startObject("aggs").startObject(field("agg", depth)); + { + body.startObject(type); + body.field("field", field("kwd", depth - 1)); + body.endObject(); + } + agg(body, type, depth - 1); + body.endObject().endObject(); + } + + public void testMultiTerms() throws Exception { + Request request = new Request("POST", "/deep/_search"); + XContentBuilder body = JsonXContent.contentBuilder().prettyPrint().startObject(); + body.field("size", 0); + autoDateInMultiTerms(body, b -> { + for (int i = 0; i < DEPTH; i++) { + b.startObject().field("field", field("kwd", i)).endObject(); + } + }); + request.setJsonEntity(Strings.toString(body.endObject())); + setTimeout(request); + try { + Map response = responseAsMap(client().performRequest(request)); + ListMatcher buckets = matchesList(); + for (int i = 0; i < 10; i++) { + buckets = buckets.item( + matchesMap().entry("key_as_string", any(String.class)) + .entry("key", hasSize(10)) + .entry("doc_count", 1) + .entry("adh", matchesMap().entry("buckets", hasSize(1)).entry("interval", "1s")) + ); + } + MapMatcher agg = matchesMap().entry("buckets", buckets) + .entry("doc_count_error_upper_bound", 0) + .entry("sum_other_doc_count", greaterThan(0)); + assertMap(response, matchesMap().extraOk().entry("aggregations", matchesMap().entry("multi", agg))); + } catch (SocketTimeoutException timeout) { + logger.info("timed out"); + assertNoSearchesRunning(); + } + } + + public void testMultiTermWithTimestamp() throws Exception { + Request request = new Request("POST", "/deep/_search"); + XContentBuilder body = JsonXContent.contentBuilder().prettyPrint().startObject(); + body.field("size", 0); + autoDateInMultiTerms(body, b -> { + b.startObject().field("field", field("kwd", 0)).endObject(); + b.startObject().field("field", "@timestamp").endObject(); + }); + request.setJsonEntity(Strings.toString(body.endObject())); + setTimeout(request); + try { + Map response = responseAsMap(client().performRequest(request)); + ListMatcher buckets = matchesList(); + for (int i = 0; i < 10; i++) { + buckets = buckets.item( + matchesMap().entry("key_as_string", any(String.class)) + .entry("key", hasSize(10)) + .entry("doc_count", 1) + .entry("adh", matchesMap().entry("buckets", hasSize(1)).entry("interval", "1s")) + ); + } + MapMatcher agg = matchesMap().entry("buckets", buckets) + .entry("doc_count_error_upper_bound", 0) + .entry("sum_other_doc_count", greaterThan(0)); + assertMap(response, matchesMap().extraOk().entry("aggregations", matchesMap().entry("multi", agg))); + } catch (SocketTimeoutException timeout) { + logger.info("timed out"); + assertNoSearchesRunning(); + } + } + + private void autoDateInMultiTerms(XContentBuilder body, CheckedConsumer terms) throws IOException { + body.startObject("aggs").startObject("multi"); + { + body.startObject("multi_terms"); + { + body.startArray("terms"); + terms.accept(body); + body.endArray(); + body.startArray("order"); + { + body.startObject().field("_count", "desc").endObject(); + body.startObject().field("_key", "asc").endObject(); + } + body.endArray(); + } + body.endObject(); + body.startObject("aggs").startObject("adh").startObject("auto_date_histogram"); + { + body.field("field", "@timestamp"); + body.field("buckets", 1); + } + body.endObject().endObject().endObject(); + } + body.endObject().endObject(); + } + + @Before + public void createDeep() throws IOException { + if (indexExists("deep")) { + return; + } + logger.info("creating deep index"); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("properties"); + mapping.startObject("@timestamp").field("type", "date").endObject(); + for (int f = 0; f < DEPTH; f++) { + mapping.startObject(field("kwd", f)).field("type", "keyword").endObject(); + } + CreateIndexResponse createIndexResponse = createIndex( + "deep", + Settings.builder().put("index.number_of_replicas", 0).build(), + Strings.toString(mapping.endObject().endObject()) + ); + assertThat(createIndexResponse.isAcknowledged(), equalTo(true)); + Bulk bulk = new Bulk(); + bulk.doc(new StringBuilder("{"), 0); + bulk.flush(); + + MapMatcher shardsOk = matchesMap().entry("total", 1).entry("failed", 0).entry("successful", 1); + logger.info("refreshing deep index"); + Map refresh = responseAsMap(client().performRequest(new Request("POST", "/_refresh"))); + assertMap(refresh, matchesMap().entry("_shards", shardsOk)); + + logger.info("double checking deep index count"); + Map count = responseAsMap(client().performRequest(new Request("POST", "/deep/_count"))); + assertMap(count, matchesMap().entry("_shards", shardsOk.entry("skipped", 0)).entry("count", TOTAL_DOCS)); + + logger.info("deep index ready for test"); + } + + private String field(String prefix, int field) { + return String.format(Locale.ROOT, "%s%03d", prefix, field); + } + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + class Bulk { + private static final int BULK_SIZE = Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()); + + StringBuilder bulk = new StringBuilder(); + int current = 0; + int total = 0; + long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); + + void doc(StringBuilder doc, int field) throws IOException { + if (field != 0) { + doc.append(','); + } + int len = doc.length(); + for (int value = 0; value < VALUE_COUNT; value++) { + doc.append('"').append(field("kwd", field)).append("\":\"").append(value).append('"'); + if (field == DEPTH - 1) { + doc.append(",\"@timestamp\":").append(timestamp).append('}'); + timestamp += TimeValue.timeValueMinutes(1).millis(); + addToBulk(doc); + } else { + doc(doc, field + 1); + } + doc.setLength(len); + } + } + + void addToBulk(StringBuilder doc) throws IOException { + current++; + total++; + bulk.append("{\"index\":{}}\n"); + bulk.append(doc).append('\n'); + if (bulk.length() > BULK_SIZE) { + flush(); + } + } + + void flush() throws IOException { + logger.info( + "Flushing to deep {} docs/{}. Total {}% {}/{}", + current, + ByteSizeValue.ofBytes(bulk.length()), + String.format(Locale.ROOT, "%04.1f", 100.0 * total / TOTAL_DOCS), + total, + TOTAL_DOCS + ); + Request request = new Request("POST", "/deep/_bulk"); + request.setJsonEntity(bulk.toString()); + Map response = responseAsMap(client().performRequest(request)); + assertMap(response, matchesMap().extraOk().entry("errors", false)); + bulk.setLength(0); + current = 0; + } + } + + private void setTimeout(Request request) { + RequestConfig.Builder config = RequestConfig.custom(); + config.setSocketTimeout(Math.toIntExact(TIMEOUT.millis())); + request.setOptions(request.getOptions().toBuilder().setRequestConfig(config.build())); + } + + private void assertNoSearchesRunning() throws Exception { + Request tasks = new Request("GET", "/_tasks"); + tasks.addParameter("actions", "*search"); + tasks.addParameter("detailed", ""); + assertBusy(() -> { + Map response = responseAsMap(client().performRequest(tasks)); + // If there are running searches the map in `nodes` is non-empty. + assertMap(response, matchesMap().entry("nodes", matchesMap())); + }); + } +}