Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor the Cache.CacheStats class to use the Builder pattern instead of constructors ([#20015](https://github.com/opensearch-project/OpenSearch/pull/20015))
- Refactor the HttpStats, ScriptStats, AdaptiveSelectionStats and OsStats class to use the Builder pattern instead of constructors ([#20014](https://github.com/opensearch-project/OpenSearch/pull/20014))
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))

- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
- Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057))
- Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162))

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,39 @@
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.Rounding;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorBase;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
import java.util.function.LongFunction;
import java.util.function.Supplier;

/**
* Histogram collection logic using skip list.
*
* Currently, it can only handle one owningBucketOrd at a time.
*
* @opensearch.internal
*/
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {

private final NumericDocValues values;
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final LeafBucketCollector sub;
private final boolean isSubNoOp;
private final BucketsAggregator aggregator;

/**
* Supplier function to get the current preparedRounding from the parent aggregator.
* This allows detection of rounding changes in AutoDateHistogramAggregator.
*/
private final LongFunction<Rounding.Prepared> preparedRoundingSupplier;
private final Supplier<LongKeyedBucketOrds> bucketOrdsSupplier;
private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same
* bucket.
Expand All @@ -48,20 +62,43 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
*/
private long upToBucketIndex;

/**
* Tracks the last preparedRounding reference to detect rounding changes.
* Used for cache invalidation when AutoDateHistogramAggregator changes rounding.
*/
private Rounding.Prepared lastPreparedRounding;

public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
LeafBucketCollector sub,
BucketsAggregator aggregator
) {
this(values, skipper, (owningBucketOrd) -> preparedRounding, () -> bucketOrds, sub, aggregator, (owningBucketOrd, rounded) -> {});
}

/**
* Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator).
*/
public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
LongFunction<Rounding.Prepared> preparedRoundingSupplier,
Supplier<LongKeyedBucketOrds> bucketOrdsSupplier,
LeafBucketCollector sub,
BucketsAggregator aggregator,
IncreaseRoundingIfNeeded increaseRoundingIfNeeded
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.preparedRoundingSupplier = preparedRoundingSupplier;
this.bucketOrdsSupplier = bucketOrdsSupplier;
this.sub = sub;
this.isSubNoOp = (sub == NO_OP_COLLECTOR);
this.aggregator = aggregator;
this.increaseRoundingIfNeeded = increaseRoundingIfNeeded;
}

@Override
Expand All @@ -87,17 +124,20 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {

upToInclusive = skipper.maxDocID(0);

// Get current rounding from supplier
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);

// Now find the highest level where all docs map to the same bucket.
for (int level = 0; level < skipper.numLevels(); ++level) {
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
long minBucket = preparedRounding.round(skipper.minValue(level));
long maxBucket = preparedRounding.round(skipper.maxValue(level));
long minBucket = currentRounding.round(skipper.minValue(level));
long maxBucket = currentRounding.round(skipper.maxValue(level));

if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
upToBucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
Expand All @@ -109,6 +149,16 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);

// Check if rounding changed (using reference equality)
// AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes
if (currentRounding != lastPreparedRounding) {
upToInclusive = -1; // Invalidate
upToSameBucket = false;
lastPreparedRounding = currentRounding;
}

if (doc > upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
}
Expand All @@ -118,12 +168,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
sub.collect(doc, upToBucketIndex);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
long rounded = currentRounding.round(value);
long bucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, rounded);
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
aggregator.collectExistingBucket(sub, doc, bucketIndex);
} else {
aggregator.collectBucket(sub, doc, bucketIndex);
increaseRoundingIfNeeded.accept(owningBucketOrd, rounded);
}
}
}
Expand All @@ -136,15 +188,14 @@ public void collect(DocIdStream stream) throws IOException {

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
// This will only be called if its the sub aggregation
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
if (sub == NO_OP_COLLECTOR) {
if (isSubNoOp) {
// stream.count maybe faster when we don't need to handle sub-aggs
long count = stream.count(upToExclusive);
aggregator.incrementBucketDocCount(upToBucketIndex, count);
Expand All @@ -167,4 +218,30 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException
}
}
}

/**
* Call back for auto date histogram
*
* @opensearch.internal
*/
public interface IncreaseRoundingIfNeeded {
void accept(long owningBucket, long rounded);
}

/**
* Skiplist is based as top level agg (null parent) or parent that will execute in sorted order
*
*/
public static boolean canUseSkiplist(LongBounds hardBounds, Aggregator parent, DocValuesSkipper skipper, NumericDocValues singleton) {
if (skipper == null || singleton == null) return false;
// TODO: add hard bounds support
if (hardBounds != null) return false;

if (parent == null) return true;

if (parent instanceof AggregatorBase base) {
return base.getLeafCollectorMode() == AggregatorBase.LeafCollectionMode.FILTER_REWRITE;
}
return false;
}
}
Loading
Loading