Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Query planning to determine flush mode for streaming aggregations ([#19488](https://github.com/opensearch-project/OpenSearch/pull/19488))
- Harden the circuit breaker and failure handle logic in query result consumer ([#19396](https://github.com/opensearch-project/OpenSearch/pull/19396))
- Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484))
- Disable request cache for streaming aggregation queries ([#19520](https://github.com/opensearch-project/OpenSearch/pull/19520))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,12 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
return false;
}

// Currently, we cannot cache stream search results as we never compute and reduce full resultset per
// shard at data nodes and let coordinator handle the reduce from batched results from shards.
if (context.isStreamSearch()) {
return false;
}

// We cannot cache with DFS because results depend not only on the content of the index but also
// on the overridden statistics. So if you ran two queries on the same index with different stats
// (because an other shard was updated) you would get wrong results because of the scores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ public Aggregator build(
}
if (execution == null) {
// Check if streaming is enabled and flush mode allows it (null means not yet evaluated)
FlushMode flushMode = context.getFlushMode();
if (context.isStreamSearch() && (flushMode == null || flushMode == FlushMode.PER_SEGMENT)) {
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
return createStreamStringTermsAggregator(
name,
factories,
Expand Down Expand Up @@ -231,8 +230,7 @@ public Aggregator build(
}
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
}
FlushMode flushMode = context.getFlushMode();
if (context.isStreamSearch() && (flushMode == null || flushMode == FlushMode.PER_SEGMENT)) {
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
return createStreamNumericTermsAggregator(
name,
factories,
Expand Down Expand Up @@ -373,7 +371,7 @@ static SubAggCollectionMode pickSubAggCollectMode(AggregatorFactories factories,
// We expect to return all buckets so delaying them won't save any time
return SubAggCollectionMode.DEPTH_FIRST;
}
if (context.isStreamSearch()) {
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
return SubAggCollectionMode.DEPTH_FIRST;
}
if (maxOrd == -1 || maxOrd > expectedSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.streaming.FlushMode;

import java.io.IOException;
import java.util.Locale;
Expand Down Expand Up @@ -104,7 +105,8 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
if (searchContext.isStreamSearch()) {
if (searchContext.isStreamSearch()
&& (searchContext.getFlushMode() == null || searchContext.getFlushMode() == FlushMode.PER_SEGMENT)) {
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
Expand All @@ -117,7 +119,8 @@ protected Aggregator doCreateInternal(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
if (searchContext.isStreamSearch()) {
if (searchContext.isStreamSearch()
&& (searchContext.getFlushMode() == null || searchContext.getFlushMode() == FlushMode.PER_SEGMENT)) {
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}
return queryShardContext.getValuesSourceRegistry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.streaming.Streamable;
import org.opensearch.search.streaming.StreamingCostMetrics;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -71,7 +73,7 @@
*
* @opensearch.internal
*/
class MaxAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector {
class MaxAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector, Streamable {

final ValuesSource.Numeric valuesSource;
final DocValueFormat formatter;
Expand Down Expand Up @@ -280,4 +282,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
public void doReset() {
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}

@Override
public StreamingCostMetrics getStreamingCostMetrics() {
return new StreamingCostMetrics(true, 1, 1, 1, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.streaming.Streamable;
import org.opensearch.search.streaming.StreamingCostMetrics;

import java.io.IOException;
import java.util.Map;
Expand All @@ -70,7 +72,7 @@
*
* @opensearch.internal
*/
class MinAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector {
class MinAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector, Streamable {
private static final int MAX_BKD_LOOKUPS = 1024;

final ValuesSource.Numeric valuesSource;
Expand Down Expand Up @@ -271,4 +273,14 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
(bucket, metricValue) -> mins.set(bucket, Math.min(mins.get(bucket), NumericUtils.sortableLongToDouble(metricValue)))
);
}

@Override
public void doReset() {
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
}

@Override
public StreamingCostMetrics getStreamingCostMetrics() {
return new StreamingCostMetrics(true, 1, 1, 1, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ public static Aggregator unwrap(Aggregator agg) {
return agg;
}

public Aggregator getDelegate() {
return delegate;
}

@Override
public StreamingCostMetrics getStreamingCostMetrics() {
return delegate instanceof Streamable ? ((Streamable) delegate).getStreamingCostMetrics() : StreamingCostMetrics.nonStreamable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.search.aggregations.AggregatorBase;
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.profile.aggregation.ProfilingAggregator;

/**
* Analyzes collector trees to determine optimal {@link FlushMode} for streaming aggregations.
Expand Down Expand Up @@ -147,6 +148,9 @@ private static Collector[] getChildren(Collector collector) {
if (collector instanceof MultiBucketCollector) {
return ((MultiBucketCollector) collector).getCollectors();
}
if (collector instanceof ProfilingAggregator) {
return getChildren(((ProfilingAggregator) collector).getDelegate());
}
return new Collector[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,4 +996,62 @@ public void testScriptCaching() throws Exception {
directory.close();
unmappedDirectory.close();
}

public void testStreamingCostMetrics() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1)));
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
MaxAggregationBuilder aggregationBuilder = new MaxAggregationBuilder("max").field("value");

MaxAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);

// Test streaming cost metrics
org.opensearch.search.streaming.StreamingCostMetrics metrics = aggregator.getStreamingCostMetrics();
assertNotNull(metrics);
assertTrue("MaxAggregator should be streamable", metrics.streamable());
assertEquals(1, metrics.topNSize());
assertEquals(1, metrics.estimatedBucketCount());
assertEquals(1, metrics.segmentCount());
assertEquals(1, metrics.estimatedDocCount());

indexReader.close();
directory.close();
}

public void testDoReset() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 5)));
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 10)));
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
MaxAggregationBuilder aggregationBuilder = new MaxAggregationBuilder("max").field("value");

MaxAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);

aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();

InternalMax result1 = (InternalMax) aggregator.buildAggregation(0L);
assertEquals(10.0, result1.getValue(), 0);

aggregator.doReset();

InternalMax result2 = (InternalMax) aggregator.buildAggregation(0L);
assertEquals(Double.NEGATIVE_INFINITY, result2.getValue(), 0);

indexReader.close();
directory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -777,4 +777,62 @@ protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
return new MinAggregationBuilder("foo").field(fieldName);
}

public void testStreamingCostMetrics() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1)));
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
MinAggregationBuilder aggregationBuilder = new MinAggregationBuilder("min").field("value");

MinAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);

// Test streaming cost metrics
org.opensearch.search.streaming.StreamingCostMetrics metrics = aggregator.getStreamingCostMetrics();
assertNotNull(metrics);
assertTrue("MinAggregator should be streamable", metrics.streamable());
assertEquals(1, metrics.topNSize());
assertEquals(1, metrics.estimatedBucketCount());
assertEquals(1, metrics.segmentCount());
assertEquals(1, metrics.estimatedDocCount());

indexReader.close();
directory.close();
}

public void testDoReset() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 5)));
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 10)));
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
MinAggregationBuilder aggregationBuilder = new MinAggregationBuilder("min").field("value");

MinAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);

aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();

InternalMin result1 = (InternalMin) aggregator.buildAggregation(0L);
assertEquals(5.0, result1.getValue(), 0);

aggregator.doReset();

InternalMin result2 = (InternalMin) aggregator.buildAggregation(0L);
assertEquals(Double.POSITIVE_INFINITY, result2.getValue(), 0);

indexReader.close();
directory.close();
}
}
Loading