Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'system call filter not installed' caused when network.host: 0.0.0.0 ([#18309](https://github.com/opensearch-project/OpenSearch/pull/18309))
- Fix MatrixStatsAggregator reuse when mode parameter changes ([#18242](https://github.com/opensearch-project/OpenSearch/issues/18242))
- Replace the deprecated construction method of TopScoreDocCollectorManager with the new method ([#18395](https://github.com/opensearch-project/OpenSearch/pull/18395))
- Fixed Approximate Framework regression with Lucene 10.2.1 by updating `intersectRight` BKD walk and `IntRef` visit method ([#18358](https://github.com/opensearch-project/OpenSearch/issues/18358
- Fixed Approximate Framework regression with Lucene 10.2.1 by updating `intersectRight` BKD walk and `IntRef` visit method ([#18358](https://github.com/opensearch-project/OpenSearch/issues/18358))
- Add task cancellation checks in aggregators ([#18426](https://github.com/opensearch-project/OpenSearch/pull/18426))

### Security

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.xcontent.ObjectParser;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalSum;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class AggregatorCancellationTests extends AggregatorTestCase {

@Override
protected List<SearchPlugin> getSearchPlugins() {
return List.of(new SearchPlugin() {
@Override
public List<AggregationSpec> getAggregations() {
return List.of(
new AggregationSpec(
"custom_cancellable",
CustomCancellableAggregationBuilder::new,
CustomCancellableAggregationBuilder.PARSER
)
);
}
});
}

public void testNestedAggregationCancellation() throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);

// Create documents
for (int i = 0; i < 100; i++) {
Document doc = new Document();
doc.add(new SortedSetDocValuesField("category", new BytesRef("cat" + (i % 10))));
doc.add(new SortedSetDocValuesField("subcategory", new BytesRef("subcat" + (i % 50))));
doc.add(new SortedSetDocValuesField("brand", new BytesRef("brand" + (i % 20))));
doc.add(new SortedNumericDocValuesField("value", i));
indexWriter.addDocument(doc);
}
indexWriter.close();

try (IndexReader reader = DirectoryReader.open(directory)) {
IndexSearcher searcher = newIndexSearcher(reader);

// Create nested aggregations with our custom cancellable agg
CustomCancellableAggregationBuilder aggBuilder = new CustomCancellableAggregationBuilder("test_agg").subAggregation(
new TermsAggregationBuilder("categories").field("category")
.size(10)
.subAggregation(
new TermsAggregationBuilder("subcategories").field("subcategory")
.size(50000)
.subAggregation(new TermsAggregationBuilder("brands").field("brand").size(20000))
)
);

expectThrows(
OpenSearchRejectedExecutionException.class,
() -> searchAndReduce(
searcher,
new MatchAllDocsQuery(),
aggBuilder,
keywordField("category"),
keywordField("subcategory"),
keywordField("brand")
)
);
}
}
}

private static class CustomCancellableAggregationBuilder extends AbstractAggregationBuilder<CustomCancellableAggregationBuilder> {

public static final String NAME = "custom_cancellable";

public static final ObjectParser<CustomCancellableAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(
NAME,
CustomCancellableAggregationBuilder::new
);

CustomCancellableAggregationBuilder(String name) {
super(name);
}

public CustomCancellableAggregationBuilder(StreamInput in) throws IOException {
super(in);
}

@Override
protected AggregatorFactory doBuild(
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder
) throws IOException {
return new AggregatorFactory(name, queryShardContext, parent, subfactoriesBuilder, metadata) {
@Override
protected Aggregator createInternal(
SearchContext searchContext,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
return new CustomCancellableAggregator(
name,
searchContext,
parent,
subfactoriesBuilder.build(queryShardContext, this),
null
);
}
};
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}

@Override
public CustomCancellableAggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
return new CustomCancellableAggregationBuilder(getName()).setMetadata(metadata).subAggregations(factoriesBuilder);
}

public String getType() {
return NAME;
}

@Override
public BucketCardinality bucketCardinality() {
return BucketCardinality.NONE;
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
// Nothing to write
}
}

private static class CustomCancellableAggregator extends AggregatorBase {

CustomCancellableAggregator(
String name,
SearchContext context,
Aggregator parent,
AggregatorFactories factories,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.NONE, metadata);
}

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
checkCancelled();
return LeafBucketCollector.NO_OP_COLLECTOR;
}

protected void checkCancelled() {
throw new OpenSearchRejectedExecutionException("The request has been cancelled");
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation internalAggregation = new InternalSum(name(), 0.0, DocValueFormat.RAW, metadata());
return new InternalAggregation[] { internalAggregation };
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalSum(name(), 0.0, DocValueFormat.RAW, metadata());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ public SearchShardTask getTask() {

@Override
public boolean isCancelled() {
return task.isCancelled();
return task != null && task.isCancelled();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.search.ScoreMode;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
Expand Down Expand Up @@ -328,4 +329,10 @@
public String toString() {
return name;
}

protected void checkCancelled() {
if (context.isCancelled()) {
throw new OpenSearchRejectedExecutionException("The query has been cancelled");

Check warning on line 335 in server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java#L335

Added line #L335 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
checkCancelled();
beforeBuildingBuckets(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
checkCancelled();
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
InternalAggregations[] result = new InternalAggregations[bucketOrdsToCollect.length];
Expand Down Expand Up @@ -323,6 +325,7 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
BucketBuilderForFixedCount<B> bucketBuilder,
Function<List<B>, InternalAggregation> resultBuilder
) throws IOException {
checkCancelled();
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
Expand Down Expand Up @@ -373,6 +376,7 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] ow
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
* here but we don't because single bucket aggs never have.
*/
checkCancelled();
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand Down Expand Up @@ -403,6 +407,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
BucketBuilderForVariable<B> bucketBuilder,
ResultBuilderForVariable<B> resultBuilder
) throws IOException {
checkCancelled();
long totalOrdsToCollect = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
checkCancelled();

Check warning on line 211 in server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java#L211

Added line #L211 was not covered by tests
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
int maxOrd = owningBucketOrds.length * totalNumKeys;
int totalBucketsToBuild = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ protected void doPostCollection() throws IOException {

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
checkCancelled();
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
if (deferredCollectors != NO_OP_COLLECTOR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
owningBucketOrds,
keys.length + (showOtherBucket ? 1 : 0),
(offsetInOwningOrd, docCount, subAggregationResults) -> {
checkCancelled();
if (offsetInOwningOrd < keys.length) {
return new InternalFilters.InternalBucket(keys[offsetInOwningOrd], docCount, subAggregationResults, keyed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ public AbstractHistogramAggregator(
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
checkCancelled();
double roundKey = Double.longBitsToDouble(bucketValue);
double key = roundKey * interval + offset;
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {
checkCancelled();
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ protected final InternalAggregation[] buildAggregations(
subAggregationResults
),
(owningBucketOrd, buckets) -> {
checkCancelled();
// the contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
Expand Down Expand Up @@ -733,6 +734,7 @@ private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucke
private void rebucket() {
rebucketCount++;
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
checkCancelled();
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays());
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
subAggregationResults
),
(owningBucketOrd, buckets) -> {
checkCancelled();
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I

List<InternalVariableWidthHistogram.Bucket> buckets = new ArrayList<>(numClusters);
for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) {
checkCancelled();
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults[bucketOrd]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
owningBucketOrds,
ranges.length,
(offsetInOwningOrd, docCount, subAggregationResults) -> {
checkCancelled();
Range range = ranges[offsetInOwningOrd];
return rangeFactory.createBucket(range.key, range.from, range.to, docCount, subAggregationResults, keyed, format);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
checkCancelled();
final int size;
if (localBucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, context.bigArrays())) {
checkCancelled();
filters[owningOrdIdx] = newFilter();
List<LongRareTerms.Bucket> builtBuckets = new ArrayList<>();
LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
checkCancelled();
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
int size = (int) Math.min(bucketOrds.size(), localBucketCountThresholds.getRequiredSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
checkCancelled();
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
checkCancelled();
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(context.bigArrays())) {
checkCancelled();
filters[owningOrdIdx] = newFilter();
List<StringRareTerms.Bucket> builtBuckets = new ArrayList<>();
BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
Expand Down
Loading
Loading