Skip to content

Commit e4bccb0

Browse files
committed
[Backport 2.x] [Star Tree] [Search] Resolve Date histogram with metric aggregation using star-tree (#16674)
--------- Signed-off-by: Sandesh Kumar <[email protected]>
1 parent 1c7f719 commit e4bccb0

File tree

19 files changed

+1144
-76
lines changed

19 files changed

+1144
-76
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3636
- Added new Setting property UnmodifiableOnRestore to prevent updating settings on restore snapshot ([#16957](https://github.com/opensearch-project/OpenSearch/pull/16957))
3737
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
3838
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
39+
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))
3940

4041
### Dependencies
4142
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))

server/src/main/java/org/opensearch/common/Rounding.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,10 @@ public void writeTo(StreamOutput out) throws IOException {
271271

272272
public abstract byte id();
273273

274+
public DateTimeUnit unit() {
275+
return null;
276+
}
277+
274278
/**
275279
* A strategy for rounding milliseconds since epoch.
276280
*
@@ -525,6 +529,11 @@ public byte id() {
525529
return ID;
526530
}
527531

532+
@Override
533+
public DateTimeUnit unit() {
534+
return unit;
535+
}
536+
528537
private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
529538
switch (unit) {
530539
case SECOND_OF_MINUTE:

server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,24 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) {
169169
}
170170
}
171171

172+
/**
173+
* Returns the closest valid calendar interval to be used for the search interval
174+
*/
175+
public DateTimeUnitRounding findClosestValidInterval(DateTimeUnitRounding searchInterval) {
176+
DateTimeUnitComparator comparator = new DateTimeUnitComparator();
177+
DateTimeUnitRounding closestValidInterval = null;
178+
179+
// Find the largest interval that is less than or equal to search interval
180+
for (DateTimeUnitRounding interval : sortedCalendarIntervals) {
181+
if (comparator.compare(interval, searchInterval) <= 0) {
182+
closestValidInterval = interval;
183+
} else {
184+
break;
185+
}
186+
}
187+
return closestValidInterval;
188+
}
189+
172190
/**
173191
* Returns a sorted list of dateTimeUnits based on the DateTimeUnitComparator
174192
*/

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

Lines changed: 105 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
import org.opensearch.common.lucene.Lucene;
1818
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
1919
import org.opensearch.index.codec.composite.CompositeIndexReader;
20+
import org.opensearch.index.compositeindex.datacube.DateDimension;
2021
import org.opensearch.index.compositeindex.datacube.Dimension;
2122
import org.opensearch.index.compositeindex.datacube.Metric;
2223
import org.opensearch.index.compositeindex.datacube.MetricStat;
2324
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
25+
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
26+
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
2427
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
2528
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
2629
import org.opensearch.index.query.MatchAllQueryBuilder;
2730
import org.opensearch.index.query.QueryBuilder;
2831
import org.opensearch.index.query.TermQueryBuilder;
2932
import org.opensearch.search.aggregations.AggregatorFactory;
3033
import org.opensearch.search.aggregations.LeafBucketCollector;
31-
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
34+
import org.opensearch.search.aggregations.StarTreeBucketCollector;
35+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
3236
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
3337
import org.opensearch.search.aggregations.support.ValuesSource;
3438
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -37,9 +41,10 @@
3741
import org.opensearch.search.startree.StarTreeQueryContext;
3842

3943
import java.io.IOException;
40-
import java.util.HashMap;
4144
import java.util.List;
4245
import java.util.Map;
46+
import java.util.Set;
47+
import java.util.function.BiConsumer;
4348
import java.util.function.Consumer;
4449
import java.util.stream.Collectors;
4550

@@ -74,10 +79,16 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
7479
);
7580

7681
for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
77-
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
78-
if (metricStat == null) {
79-
return null;
82+
// first check for aggregation is a metric aggregation
83+
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
84+
continue;
85+
}
86+
87+
// if not a metric aggregation, check for applicable date histogram shape
88+
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory)) {
89+
continue;
8090
}
91+
return null;
8192
}
8293

8394
// need to cache star tree values only for multiple aggregations
@@ -100,63 +111,86 @@ private static StarTreeQueryContext tryCreateStarTreeQueryContext(
100111
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
101112
queryMap = null;
102113
} else if (queryBuilder instanceof TermQueryBuilder) {
114+
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) queryBuilder;
103115
// TODO: Add support for keyword fields
104-
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
105-
// return null for non-numeric fields
106-
return null;
107-
}
108-
109-
List<String> supportedDimensions = compositeFieldType.getDimensions()
116+
Dimension matchedDimension = compositeFieldType.getDimensions()
110117
.stream()
111-
.map(Dimension::getField)
112-
.collect(Collectors.toList());
113-
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
114-
if (queryMap == null) {
118+
.filter(d -> (d.getField().equals(termQueryBuilder.fieldName()) && d.getDocValuesType() == DocValuesType.SORTED_NUMERIC))
119+
.findFirst()
120+
.orElse(null);
121+
if (matchedDimension == null) {
115122
return null;
116123
}
124+
queryMap = Map.of(termQueryBuilder.fieldName(), Long.parseLong(termQueryBuilder.value().toString()));
117125
} else {
118126
return null;
119127
}
120128
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
121129
}
122130

123-
/**
124-
* Parse query body to star-tree predicates
125-
* @param queryBuilder to match star-tree supported query shape
126-
* @return predicates to match
127-
*/
128-
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
129-
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
130-
String field = tq.fieldName();
131-
if (!supportedDimensions.contains(field)) {
132-
return null;
133-
}
134-
long inputQueryVal = Long.parseLong(tq.value().toString());
135-
136-
// Create a map with the field and the value
137-
Map<String, Long> predicateMap = new HashMap<>();
138-
predicateMap.put(field, inputQueryVal);
139-
return predicateMap;
140-
}
141-
142-
private static MetricStat validateStarTreeMetricSupport(
131+
private static boolean validateStarTreeMetricSupport(
143132
CompositeDataCubeFieldType compositeIndexFieldInfo,
144133
AggregatorFactory aggregatorFactory
145134
) {
146135
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
136+
MetricAggregatorFactory metricAggregatorFactory = (MetricAggregatorFactory) aggregatorFactory;
147137
String field;
148138
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
149139
.stream()
150140
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));
151141

152-
MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
153-
field = ((MetricAggregatorFactory) aggregatorFactory).getField();
142+
MetricStat metricStat = metricAggregatorFactory.getMetricStat();
143+
field = metricAggregatorFactory.getField();
144+
145+
return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat);
146+
}
147+
return false;
148+
}
149+
150+
private static boolean validateDateHistogramSupport(
151+
CompositeDataCubeFieldType compositeIndexFieldInfo,
152+
AggregatorFactory aggregatorFactory
153+
) {
154+
if (!(aggregatorFactory instanceof DateHistogramAggregatorFactory)
155+
|| aggregatorFactory.getSubFactories().getFactories().length < 1) {
156+
return false;
157+
}
158+
DateHistogramAggregatorFactory dateHistogramAggregatorFactory = (DateHistogramAggregatorFactory) aggregatorFactory;
159+
160+
// Find the DateDimension in the dimensions list
161+
DateDimension starTreeDateDimension = null;
162+
for (Dimension dimension : compositeIndexFieldInfo.getDimensions()) {
163+
if (dimension instanceof DateDimension) {
164+
starTreeDateDimension = (DateDimension) dimension;
165+
break;
166+
}
167+
}
168+
169+
// If no DateDimension is found, validation fails
170+
if (starTreeDateDimension == null) {
171+
return false;
172+
}
173+
174+
// Ensure the rounding is not null
175+
if (dateHistogramAggregatorFactory.getRounding() == null) {
176+
return false;
177+
}
178+
179+
// Find the closest valid interval in the DateTimeUnitRounding class associated with star tree
180+
DateTimeUnitRounding rounding = starTreeDateDimension.findClosestValidInterval(
181+
new DateTimeUnitAdapter(dateHistogramAggregatorFactory.getRounding())
182+
);
183+
if (rounding == null) {
184+
return false;
185+
}
154186

155-
if (field != null && supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
156-
return metricStat;
187+
// Validate all sub-factories
188+
for (AggregatorFactory subFactory : aggregatorFactory.getSubFactories().getFactories()) {
189+
if (!validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory)) {
190+
return false;
157191
}
158192
}
159-
return null;
193+
return true;
160194
}
161195

162196
public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
@@ -222,11 +256,37 @@ public static LeafBucketCollector getStarTreeLeafCollector(
222256
// Call the final consumer after processing all entries
223257
finalConsumer.run();
224258

225-
// Return a LeafBucketCollector that terminates collection
226-
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
259+
// Terminate after pre-computing aggregation
260+
throw new CollectionTerminatedException();
261+
}
262+
263+
public static StarTreeBucketCollector getStarTreeBucketMetricCollector(
264+
CompositeIndexFieldInfo starTree,
265+
String metric,
266+
ValuesSource.Numeric valuesSource,
267+
StarTreeBucketCollector parentCollector,
268+
Consumer<Long> growArrays,
269+
BiConsumer<Long, Long> updateBucket
270+
) throws IOException {
271+
assert parentCollector != null;
272+
return new StarTreeBucketCollector(parentCollector) {
273+
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
274+
starTree.getField(),
275+
((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(),
276+
metric
277+
);
278+
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
279+
.getMetricValuesIterator(metricName);
280+
227281
@Override
228-
public void collect(int doc, long bucket) {
229-
throw new CollectionTerminatedException();
282+
public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOException {
283+
growArrays.accept(bucket);
284+
// Advance the valuesIterator to the current bit
285+
if (!metricValuesIterator.advanceExact(starTreeEntryBit)) {
286+
return; // Skip if no entries for this document
287+
}
288+
long metricValue = metricValuesIterator.nextValue();
289+
updateBucket.accept(bucket, metricValue);
230290
}
231291
};
232292
}
@@ -240,7 +300,7 @@ public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafR
240300
throws IOException {
241301
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
242302
if (result == null) {
243-
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
303+
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of());
244304
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
245305
}
246306
return result;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations;
10+
11+
import org.apache.lucene.util.FixedBitSet;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
14+
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
/**
20+
* Collector for star tree aggregation
21+
* This abstract class exposes utilities to help avoid traversing star-tree multiple times and
22+
* collect relevant metrics across nested aggregations in a single traversal
23+
* @opensearch.internal
24+
*/
25+
@ExperimentalApi
26+
public abstract class StarTreeBucketCollector {
27+
28+
protected final StarTreeValues starTreeValues;
29+
protected final FixedBitSet matchingDocsBitSet;
30+
protected final List<StarTreeBucketCollector> subCollectors = new ArrayList<>();
31+
32+
public StarTreeBucketCollector(StarTreeValues starTreeValues, FixedBitSet matchingDocsBitSet) throws IOException {
33+
this.starTreeValues = starTreeValues;
34+
this.matchingDocsBitSet = matchingDocsBitSet;
35+
this.setSubCollectors();
36+
}
37+
38+
public StarTreeBucketCollector(StarTreeBucketCollector parent) throws IOException {
39+
this.starTreeValues = parent.getStarTreeValues();
40+
this.matchingDocsBitSet = parent.getMatchingDocsBitSet();
41+
this.setSubCollectors();
42+
}
43+
44+
/**
45+
* Sets the sub-collectors to track nested aggregators
46+
*/
47+
public void setSubCollectors() throws IOException {};
48+
49+
/**
50+
* Returns a list of sub-collectors to track nested aggregators
51+
*/
52+
public List<StarTreeBucketCollector> getSubCollectors() {
53+
return subCollectors;
54+
}
55+
56+
/**
57+
* Returns the tree values to iterate
58+
*/
59+
public StarTreeValues getStarTreeValues() {
60+
return starTreeValues;
61+
}
62+
63+
/**
64+
* Returns the matching docs bitset to iterate upon the star-tree values based on search query
65+
*/
66+
public FixedBitSet getMatchingDocsBitSet() {
67+
return matchingDocsBitSet;
68+
}
69+
70+
/**
71+
* Collects the star tree entry and bucket ordinal to update
72+
* The method implementation should identify the metrics to collect from that star-tree entry to the specified bucket
73+
*/
74+
public abstract void collectStarTreeEntry(int starTreeEntry, long bucket) throws IOException;
75+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* This interface is used to pre-compute the star tree bucket collector for each segment/leaf.
18+
* It is utilized by parent aggregation to retrieve a StarTreeBucketCollector which can be used to
19+
* pre-compute the associated aggregation along with its parent pre-computation using star-tree
20+
*
21+
* @opensearch.internal
22+
*/
23+
public interface StarTreePreComputeCollector {
24+
/**
25+
* Get the star tree bucket collector for the specified segment/leaf
26+
*/
27+
StarTreeBucketCollector getStarTreeBucketCollector(
28+
LeafReaderContext ctx,
29+
CompositeIndexFieldInfo starTree,
30+
StarTreeBucketCollector parentCollector
31+
) throws IOException;
32+
}

0 commit comments

Comments
 (0)