Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Optimization in String Terms Aggregation query for Large Bucket Counts([#18732](https://github.com/opensearch-project/OpenSearch/pull/18732))
- New cluster setting search.query.max_query_string_length ([#19491](https://github.com/opensearch-project/OpenSearch/pull/19491))
- Add `StreamNumericTermsAggregator` to allow numeric term aggregation streaming ([#19335](https://github.com/opensearch-project/OpenSearch/pull/19335))
- Query planning to determine flush mode for streaming aggregations ([#19488](https://github.com/opensearch-project/OpenSearch/pull/19488))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.streaming.FlushModeResolver;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
Expand Down Expand Up @@ -811,6 +812,9 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
SearchService.QUERY_REWRITING_ENABLED_SETTING,
SearchService.QUERY_REWRITING_TERMS_THRESHOLD_SETTING,
FlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNT,
FlushModeResolver.STREAMING_MIN_CARDINALITY_RATIO,
FlushModeResolver.STREAMING_MIN_ESTIMATED_BUCKET_COUNT,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.streaming.FlushMode;
import org.opensearch.search.suggest.SuggestionSearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -130,6 +131,9 @@
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
import static org.opensearch.search.SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
import static org.opensearch.search.streaming.FlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNT;
import static org.opensearch.search.streaming.FlushModeResolver.STREAMING_MIN_CARDINALITY_RATIO;
import static org.opensearch.search.streaming.FlushModeResolver.STREAMING_MIN_ESTIMATED_BUCKET_COUNT;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -219,8 +223,9 @@ final class DefaultSearchContext extends SearchContext {
private final int bucketSelectionStrategyFactor;
private final boolean keywordIndexOrDocValuesEnabled;

private final boolean isStreamSearch;
private boolean isStreamSearch;
private StreamSearchChannelListener listener;
private final SetOnce<FlushMode> cachedFlushMode = new SetOnce<>();

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -1277,4 +1282,33 @@ public StreamSearchChannelListener getStreamChannelListener() {
public boolean isStreamSearch() {
return isStreamSearch;
}

/**
* Disables streaming for this search context.
* Used when streaming cost analysis determines traditional processing is more efficient.
*/
@Override
public FlushMode getFlushMode() {
return cachedFlushMode.get();
}

@Override
public boolean setFlushModeIfAbsent(FlushMode flushMode) {
return cachedFlushMode.trySet(flushMode);
}

@Override
public long getStreamingMaxEstimatedBucketCount() {
return clusterService.getClusterSettings().get(STREAMING_MAX_ESTIMATED_BUCKET_COUNT);
}

@Override
public double getStreamingMinCardinalityRatio() {
return clusterService.getClusterSettings().get(STREAMING_MIN_CARDINALITY_RATIO);
}

@Override
public long getStreamingMinEstimatedBucketCount() {
return clusterService.getClusterSettings().get(STREAMING_MIN_ESTIMATED_BUCKET_COUNT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Objects;

import static org.opensearch.search.aggregations.AggregatorTreeEvaluator.evaluateAndRecreateIfNeeded;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
Expand All @@ -42,7 +44,7 @@ public abstract class AggregationCollectorManager implements CollectorManager<Co

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(aggProvider.apply(context));
final Collector collector = createCollector(context, aggProvider);
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
Expand All @@ -68,8 +70,13 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
static Collector createCollector(SearchContext searchContext, CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider)
throws IOException {
Collector collector = MultiBucketCollector.wrap(aggProvider.apply(searchContext));

// Evaluate streaming decision and potentially recreate tree
collector = evaluateAndRecreateIfNeeded(collector, searchContext, aggProvider);

((BucketCollector) collector).preCollection();
return collector;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.streaming.FlushMode;
import org.opensearch.search.streaming.FlushModeResolver;

import java.io.IOException;
import java.util.List;

/**
* Performs cost-benefit analysis on aggregator trees to optimize streaming decisions.
*
* <p>Evaluates whether streaming aggregations will be beneficial by analyzing the entire
* collector tree using {@link FlushModeResolver}. When streaming is determined to be
* inefficient, recreates the aggregator tree with traditional (non-streaming) aggregators.
* Decisions are cached to ensure consistency across concurrent segment processing.
*
* @opensearch.experimental
*/
@ExperimentalApi
public final class AggregatorTreeEvaluator {

private AggregatorTreeEvaluator() {}

/**
* Analyzes collector tree and recreates it with optimal aggregator types.
*
* <p>Determines the appropriate {@link FlushMode} for the collector tree and recreates
* aggregators if streaming is not beneficial. Should be called after initial aggregator
* creation but before query execution.
*
* @param collector the root collector to analyze
* @param searchContext search context for caching and configuration
* @param aggProvider factory function to recreate aggregators when needed
* @return optimized collector (original if streaming, recreated if traditional)
* @throws IOException if aggregator recreation fails
*/
public static Collector evaluateAndRecreateIfNeeded(
Collector collector,
SearchContext searchContext,
CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider
) throws IOException {
if (!searchContext.isStreamSearch()) {
return collector;
}

FlushMode flushMode = getFlushMode(collector, searchContext);

if (flushMode == FlushMode.PER_SEGMENT) {
return collector;
} else {
return MultiBucketCollector.wrap(aggProvider.apply(searchContext));
}
}

/**
* Resolves flush mode using cached decision or on-demand evaluation.
*
* @param collector the collector to evaluate
* @param searchContext search context for decision caching
* @return the resolved flush mode for this query
*/
private static FlushMode getFlushMode(Collector collector, SearchContext searchContext) {
FlushMode cached = searchContext.getFlushMode();
if (cached != null) {
return cached;
}

long maxBucketCount = searchContext.getStreamingMaxEstimatedBucketCount();
double minCardinalityRatio = searchContext.getStreamingMinCardinalityRatio();
long minBucketCount = searchContext.getStreamingMinEstimatedBucketCount();
FlushMode mode = FlushModeResolver.resolve(collector, FlushMode.PER_SHARD, maxBucketCount, minCardinalityRatio, minBucketCount);

if (!searchContext.setFlushModeIfAbsent(mode)) {
// this could happen in case of race condition, we go ahead with what's been set already
FlushMode existingMode = searchContext.getFlushMode();
return existingMode != null ? existingMode : mode;
}

return mode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Numbers;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.fielddata.FieldData;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.NumberFieldMapper.NumberFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -28,6 +31,8 @@
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.streaming.Streamable;
import org.opensearch.search.streaming.StreamingCostMetrics;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -45,7 +50,7 @@
*
* @opensearch.internal
*/
public class StreamNumericTermsAggregator extends TermsAggregator {
public class StreamNumericTermsAggregator extends TermsAggregator implements Streamable {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource.Numeric valuesSource;
private final IncludeExclude.LongFilter longFilter;
Expand Down Expand Up @@ -533,10 +538,61 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("result_strategy", resultStrategy.describe());
add.accept("total_buckets", bucketOrds == null ? 0 : bucketOrds.size());

StreamingCostMetrics metrics = getStreamingCostMetrics();
add.accept("streaming_enabled", metrics.streamable());
add.accept("streaming_top_n_size", metrics.topNSize());
add.accept("streaming_estimated_buckets", metrics.estimatedBucketCount());
add.accept("streaming_estimated_docs", metrics.estimatedDocCount());
add.accept("streaming_segment_count", metrics.segmentCount());
}

@Override
public void doClose() {
Releasables.close(super::doClose, bucketOrds, resultStrategy);
}

@Override
public StreamingCostMetrics getStreamingCostMetrics() {
try {
String fieldName = valuesSource.getIndexFieldName();
long totalDocsWithField = PointValues.size(context.searcher().getIndexReader(), fieldName);
int segmentCount = context.searcher().getIndexReader().leaves().size();

if (totalDocsWithField == 0) {
return new StreamingCostMetrics(true, bucketCountThresholds.getShardSize(), 0, segmentCount, 0);
}

MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(fieldName);
if (fieldType == null || !(fieldType.unwrap() instanceof NumberFieldType numberFieldType)) {
return StreamingCostMetrics.nonStreamable();
}

Number minPoint = numberFieldType.parsePoint(PointValues.getMinPackedValue(context.searcher().getIndexReader(), fieldName));
Number maxPoint = numberFieldType.parsePoint(PointValues.getMaxPackedValue(context.searcher().getIndexReader(), fieldName));

long maxCardinality = switch (resultStrategy) {
case LongTermsResults ignored -> {
long min = minPoint.longValue();
long max = maxPoint.longValue();
yield Math.max(1, max - min + 1);
}
case DoubleTermsResults ignored -> {
double min = minPoint.doubleValue();
double max = maxPoint.doubleValue();
yield Math.max(1, Math.min((long) (max - min + 1), totalDocsWithField));
}
case UnsignedLongTermsResults ignored -> {
long min = minPoint.longValue();
long max = maxPoint.longValue();
yield Math.max(1, max - min + 1);
}
case null, default -> 1L;
};

return new StreamingCostMetrics(true, bucketCountThresholds.getShardSize(), maxCardinality, segmentCount, totalDocsWithField);
} catch (IOException e) {
return StreamingCostMetrics.nonStreamable();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.streaming.Streamable;
import org.opensearch.search.streaming.StreamingCostMetrics;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -40,7 +42,7 @@
/**
* Stream search terms aggregation
*/
public class StreamStringTermsAggregator extends AbstractStringTermsAggregator {
public class StreamStringTermsAggregator extends AbstractStringTermsAggregator implements Streamable {
private SortedSetDocValues sortedDocValuesPerBatch;
private long valueCount;
private final ValuesSource.Bytes.WithOrdinals valuesSource;
Expand Down Expand Up @@ -135,6 +137,27 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

@Override
public StreamingCostMetrics getStreamingCostMetrics() {
try {
List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
long maxCardinality = 0;
long totalDocsWithField = 0;

for (LeafReaderContext leaf : leaves) {
SortedSetDocValues docValues = valuesSource.ordinalsValues(leaf);
if (docValues != null) {
maxCardinality = Math.max(maxCardinality, docValues.getValueCount());
totalDocsWithField += docValues.cost();
}
}

return new StreamingCostMetrics(true, bucketCountThresholds.getShardSize(), maxCardinality, leaves.size(), totalDocsWithField);
} catch (IOException e) {
return StreamingCostMetrics.nonStreamable();
}
}

/**
* Strategy for building results.
*/
Expand Down Expand Up @@ -327,5 +350,12 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("result_strategy", resultStrategy.describe());
add.accept("segments_with_single_valued_ords", segmentsWithSingleValuedOrds);
add.accept("segments_with_multi_valued_ords", segmentsWithMultiValuedOrds);

StreamingCostMetrics metrics = getStreamingCostMetrics();
add.accept("streaming_enabled", metrics.streamable());
add.accept("streaming_top_n_size", metrics.topNSize());
add.accept("streaming_estimated_buckets", metrics.estimatedBucketCount());
add.accept("streaming_estimated_docs", metrics.estimatedDocCount());
add.accept("streaming_segment_count", metrics.segmentCount());
}
}
Loading
Loading