From 3079202eb53b8292ba0be4c2002d8d51bd0c261b Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Wed, 19 Nov 2025 10:22:08 -0800 Subject: [PATCH 1/9] Initial work to get auto date histogram to use skiplist Signed-off-by: Asim Mahmood --- .../HistogramSkiplistLeafCollector.java | 53 +- .../AutoDateHistogramAggregator.java | 45 +- .../AutoDateHistogramAggregatorTests.java | 932 ++++++++++++++++++ 3 files changed, 1021 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index acbc62cb18024..6260934bbbea8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -27,10 +27,15 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector { private final NumericDocValues values; private final DocValuesSkipper skipper; - private final Rounding.Prepared preparedRounding; private final LongKeyedBucketOrds bucketOrds; private final LeafBucketCollector sub; private final BucketsAggregator aggregator; + + /** + * Supplier function to get the current preparedRounding from the parent aggregator. + * This allows detection of rounding changes in AutoDateHistogramAggregator. + */ + private final java.util.function.Supplier preparedRoundingSupplier; /** * Max doc ID (inclusive) up to which all docs values may map to the same @@ -48,6 +53,12 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector { */ private long upToBucketIndex; + /** + * Tracks the last preparedRounding reference to detect rounding changes. + * Used for cache invalidation when AutoDateHistogramAggregator changes rounding. + */ + private Rounding.Prepared lastPreparedRounding; + public HistogramSkiplistLeafCollector( NumericDocValues values, DocValuesSkipper skipper, @@ -58,7 +69,26 @@ public HistogramSkiplistLeafCollector( ) { this.values = values; this.skipper = skipper; - this.preparedRounding = preparedRounding; + this.preparedRoundingSupplier = () -> preparedRounding; + this.bucketOrds = bucketOrds; + this.sub = sub; + this.aggregator = aggregator; + } + + /** + * Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator). + */ + public HistogramSkiplistLeafCollector( + NumericDocValues values, + DocValuesSkipper skipper, + java.util.function.Supplier preparedRoundingSupplier, + LongKeyedBucketOrds bucketOrds, + LeafBucketCollector sub, + BucketsAggregator aggregator + ) { + this.values = values; + this.skipper = skipper; + this.preparedRoundingSupplier = preparedRoundingSupplier; this.bucketOrds = bucketOrds; this.sub = sub; this.aggregator = aggregator; @@ -87,11 +117,14 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { upToInclusive = skipper.maxDocID(0); + // Get current rounding from supplier + Rounding.Prepared currentRounding = preparedRoundingSupplier.get(); + // Now find the highest level where all docs map to the same bucket. for (int level = 0; level < skipper.numLevels(); ++level) { int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; - long minBucket = preparedRounding.round(skipper.minValue(level)); - long maxBucket = preparedRounding.round(skipper.maxValue(level)); + long minBucket = currentRounding.round(skipper.minValue(level)); + long maxBucket = currentRounding.round(skipper.maxValue(level)); if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { // All docs at this level have a value, and all values map to the same bucket. @@ -109,6 +142,16 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { @Override public void collect(int doc, long owningBucketOrd) throws IOException { + // Get current rounding from supplier + Rounding.Prepared currentRounding = preparedRoundingSupplier.get(); + + // Check if rounding changed (using reference equality) + // AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes + if (currentRounding != lastPreparedRounding) { + upToInclusive = -1; // Invalidate cache + lastPreparedRounding = currentRounding; + } + if (doc > upToInclusive) { advanceSkipper(doc, owningBucketOrd); } @@ -118,7 +161,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException { sub.collect(doc, upToBucketIndex); } else if (values.advanceExact(doc)) { final long value = values.longValue(); - long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); + long bucketIndex = bucketOrds.add(owningBucketOrd, currentRounding.round(value)); if (bucketIndex < 0) { bucketIndex = -1 - bucketIndex; aggregator.collectExistingBucket(sub, doc, bucketIndex); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 63951953a2f5d..f82c08dcca838 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -31,7 +31,10 @@ package org.opensearch.search.aggregations.bucket.histogram; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesSkipper; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; @@ -52,6 +55,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; import org.opensearch.search.aggregations.bucket.DeferringBucketCollector; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; @@ -85,6 +89,7 @@ * @opensearch.internal */ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { + static AutoDateHistogramAggregator build( String name, AggregatorFactories factories, @@ -135,6 +140,7 @@ static AutoDateHistogramAggregator build( protected final int targetBuckets; protected int roundingIdx; protected Rounding.Prepared preparedRounding; + private final String fieldName; private final FilterRewriteOptimizationContext filterRewriteOptimizationContext; @@ -157,6 +163,9 @@ private AutoDateHistogramAggregator( this.roundingInfos = roundingInfos; this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); + this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData) + ? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName() + : null; DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() { @Override @@ -243,7 +252,7 @@ public final DeferringBucketCollector getDeferringCollector() { return deferringCollector; } - protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException; + protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException; @Override protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { @@ -262,7 +271,14 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc } final SortedNumericDocValues values = valuesSource.longValues(ctx); - final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); + // Try to unwrap to single-valued NumericDocValues + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + DocValuesSkipper skipper = null; + if (this.fieldName != null) { + skipper = ctx.reader().getDocValuesSkipper(this.fieldName); + } + + final LeafBucketCollector iteratingCollector = getLeafCollector(values, skipper, sub); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -370,6 +386,9 @@ private static class FromSingle extends AutoDateHistogramAggregator { private long min = Long.MAX_VALUE; private long max = Long.MIN_VALUE; + // Debug tracking counters for collector types + private int skiplistCollectorCount = 0; + FromSingle( String name, AggregatorFactories factories, @@ -402,7 +421,24 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException { + // Check if skiplist optimization is available + // Requirements: 1.1, 1.2, 1.3, 1.4 + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + if (singleton != null && skipper != null) { + // Increment skiplist collector count + skiplistCollectorCount++; + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + () -> preparedRounding, // Pass supplier to allow rounding change detection + bucketOrds, + sub, + FromSingle.this + ); + } + + // Fall back to standard LeafBucketCollectorBase when skiplist unavailable return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -513,6 +549,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); add.accept("surviving_buckets", bucketOrds.size()); + add.accept("skiplist_collectors_used", skiplistCollectorCount); } @Override @@ -661,7 +698,7 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException { return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 95f56d779b088..2795fc57819be 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -60,6 +60,7 @@ import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.bucket.terms.InternalTerms; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.metrics.InternalMax; @@ -1052,6 +1053,937 @@ private Map maxAsMap(InternalAutoDateHistogram result) { return map; } + /** + * Test that verifies FromSingle.increaseRoundingIfNeeded() works correctly with skiplist collector. + * This test validates: + * 1. preparedRounding field update is visible to skiplist collector + * 2. New Rounding.Prepared instance is created on rounding change + * 3. owningBucketOrd is always 0 in FromSingle context + * 4. Bucket merging works correctly after rounding change + * + * Requirements: 3.1, 3.2, 3.4 + */ + public void testSkiplistCollectorWithRoundingChange() throws IOException { + // Create a dataset that will trigger rounding changes + // Start with hourly data, then add data that spans months to force rounding increase + final List dataset = new ArrayList<>(); + + // Add hourly data for first day (24 docs) + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + for (int hour = 0; hour < 24; hour++) { + dataset.add(start.plusHours(hour)); + } + + // Add data spanning several months to trigger rounding increase (30 docs) + for (int month = 0; month < 6; month++) { + for (int day = 0; day < 5; day++) { + dataset.add(start.plusMonths(month).plusDays(day * 6)); + } + } + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + // Verify that aggregation completed successfully + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created (exact count depends on rounding chosen) + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + // Verify total doc count matches input + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(54, totalDocs); // 24 + 30 = 54 docs + + // Verify buckets are in ascending order (requirement for histogram) + List buckets = histogram.getBuckets(); + for (int i = 1; i < buckets.size(); i++) { + assertTrue( + ((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey()) + ); + } + } + ); + } + + /** + * Test that verifies skiplist collector handles rounding changes correctly with sub-aggregations. + * This ensures that when rounding changes mid-collection, sub-aggregations still produce correct results. + * + * Requirements: 3.1, 3.2, 3.4 + */ + public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException { + // Create dataset that triggers rounding change + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add data spanning months to trigger rounding increase + for (int month = 0; month < 12; month++) { + for (int day = 0; day < 3; day++) { + dataset.add(start.plusMonths(month).plusDays(day * 10)); + } + } + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(8) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 8); + + // Verify sub-aggregations are present and valid + for (Histogram.Bucket bucket : histogram.getBuckets()) { + InternalStats stats = bucket.getAggregations().get("stats"); + assertNotNull(stats); + if (bucket.getDocCount() > 0) { + assertTrue(AggregationInspectionHelper.hasValue(stats)); + assertTrue(stats.getCount() > 0); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(36, totalDocs); // 12 months * 3 days = 36 docs + } + ); + } + + /** + * Test that verifies bucket merging works correctly after rounding change. + * This test creates a scenario where buckets must be merged when rounding increases. + * + * Requirements: 3.2, 3.4 + */ + public void testSkiplistCollectorBucketMergingAfterRoundingChange() throws IOException { + // Create dataset with fine-grained data that will be merged into coarser buckets + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add hourly data for multiple days, then add data spanning years + // This forces rounding to increase from hours -> days -> months -> years + for (int day = 0; day < 10; day++) { + for (int hour = 0; hour < 24; hour++) { + dataset.add(start.plusDays(day).plusHours(hour)); + } + } + + // Add data spanning multiple years to force coarse rounding + for (int year = 0; year < 5; year++) { + for (int month = 0; month < 12; month += 3) { + dataset.add(start.plusYears(year).plusMonths(month)); + } + } + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created and merged appropriately + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + // Verify total doc count is preserved after merging + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(260, totalDocs); // (10 days * 24 hours) + (5 years * 4 quarters) = 240 + 20 = 260 + + // Verify buckets are properly ordered + List buckets = histogram.getBuckets(); + for (int i = 1; i < buckets.size(); i++) { + assertTrue( + ((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey()) + ); + } + } + ); + } + + /** + * Test skiplist collector with terms sub-aggregation. + * Verifies that terms sub-aggregation produces correct results when using skiplist collector. + * + * Requirements: 4.4 + */ + public void testSkiplistCollectorWithTermsSubAggregation() throws IOException { + // Create dataset with dates and a categorical field + final List dataset = Arrays.asList( + ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) + ); + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.terms("terms").field(NUMERIC_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + + // Verify terms sub-aggregation is present in each bucket + for (Histogram.Bucket bucket : histogram.getBuckets()) { + if (bucket.getDocCount() > 0) { + // NUMERIC_FIELD is a long field, so it returns LongTerms (which extends InternalTerms) + InternalTerms termsAgg = bucket.getAggregations().get("terms"); + assertNotNull("Terms sub-aggregation should be present", termsAgg); + assertTrue("Terms sub-aggregation should have buckets", termsAgg.getBuckets().size() > 0); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(6, totalDocs); + } + ); + } + + /** + * Test skiplist collector with stats sub-aggregation. + * Verifies that stats sub-aggregation produces correct results when using skiplist collector. + * + * Requirements: 4.4 + */ + public void testSkiplistCollectorWithStatsSubAggregation() throws IOException { + // Create dataset with dates + final List dataset = Arrays.asList( + ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) + ); + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + + // Verify stats sub-aggregation is present and correct in each bucket + for (Histogram.Bucket bucket : histogram.getBuckets()) { + InternalStats stats = bucket.getAggregations().get("stats"); + assertNotNull("Stats sub-aggregation should be present", stats); + + if (bucket.getDocCount() > 0) { + assertTrue("Stats sub-aggregation should have value", AggregationInspectionHelper.hasValue(stats)); + assertEquals("Stats count should match bucket doc count", bucket.getDocCount(), stats.getCount()); + assertFalse("Stats min should not be infinite", Double.isInfinite(stats.getMin())); + assertFalse("Stats max should not be infinite", Double.isInfinite(stats.getMax())); + } else { + assertFalse("Empty bucket stats should not have value", AggregationInspectionHelper.hasValue(stats)); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(6, totalDocs); + } + ); + } + + /** + * Test skiplist collector with max sub-aggregation. + * Verifies that max sub-aggregation produces correct results when using skiplist collector. + * + * Requirements: 4.4 + */ + public void testSkiplistCollectorWithMaxSubAggregation() throws IOException { + // Create dataset with dates + final List dataset = Arrays.asList( + ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) + ); + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + + // Verify max sub-aggregation is present and correct in each bucket + for (Histogram.Bucket bucket : histogram.getBuckets()) { + InternalMax max = bucket.getAggregations().get("max"); + assertNotNull("Max sub-aggregation should be present", max); + + if (bucket.getDocCount() > 0) { + assertTrue("Max sub-aggregation should have value", AggregationInspectionHelper.hasValue(max)); + assertFalse("Max value should not be infinite", Double.isInfinite(max.getValue())); + assertTrue("Max value should be non-negative", max.getValue() >= 0); + } else { + assertFalse("Empty bucket max should not have value", AggregationInspectionHelper.hasValue(max)); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(6, totalDocs); + } + ); + } + + /** + * Test that verifies sub-aggregation results match between skiplist and standard collectors. + * This test compares results from both collectors to ensure correctness. + * + * Requirements: 4.4 + */ + public void testSkiplistCollectorSubAggregationResultsMatchStandard() throws IOException { + // Create a dataset that will use skiplist collector + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add data spanning several months + for (int month = 0; month < 6; month++) { + for (int day = 0; day < 5; day++) { + dataset.add(start.plusMonths(month).plusDays(day * 6)); + } + } + + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(8) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)) + .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 8); + + // Verify both sub-aggregations are present and consistent + for (Histogram.Bucket bucket : histogram.getBuckets()) { + if (bucket.getDocCount() > 0) { + InternalStats stats = bucket.getAggregations().get("stats"); + InternalMax max = bucket.getAggregations().get("max"); + + assertNotNull("Stats sub-aggregation should be present", stats); + assertNotNull("Max sub-aggregation should be present", max); + + assertTrue("Stats should have value", AggregationInspectionHelper.hasValue(stats)); + assertTrue("Max should have value", AggregationInspectionHelper.hasValue(max)); + + // Verify consistency: max from stats should equal max aggregation + assertEquals( + "Max from stats should match max aggregation", + stats.getMax(), + max.getValue(), + 0.0001 + ); + + // Verify stats count matches bucket doc count + assertEquals( + "Stats count should match bucket doc count", + bucket.getDocCount(), + stats.getCount() + ); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(30, totalDocs); // 6 months * 5 days = 30 docs + } + ); + } + + /** + * Integration test for rounding transitions with hourly data. + * Tests that bucket assignments remain correct when rounding changes from hours to days. + * + * Requirements: 2.3, 3.2 + */ + public void testRoundingTransitionsWithHourlyData() throws IOException { + // Create dataset with hourly data spanning multiple days + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add 168 hours (7 days) of hourly data + for (int hour = 0; hour < 168; hour++) { + dataset.add(start.plusHours(hour)); + } + + // Test with target bucket count of 5 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + // Verify total doc count is preserved + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(168, totalDocs); + + // Verify buckets are ordered + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 10 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(168, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 20 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 20); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(168, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 50 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 50); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(168, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + + /** + * Integration test for rounding transitions with daily data. + * Tests that bucket assignments remain correct when rounding changes from days to weeks/months. + * + * Requirements: 2.3, 3.2 + */ + public void testRoundingTransitionsWithDailyData() throws IOException { + // Create dataset with daily data spanning multiple months + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add 180 days (approximately 6 months) of daily data + for (int day = 0; day < 180; day++) { + dataset.add(start.plusDays(day)); + } + + // Test with target bucket count of 5 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(180, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 10 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(180, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 20 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 20); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(180, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 50 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 50); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(180, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + + /** + * Integration test for rounding transitions with monthly data. + * Tests that bucket assignments remain correct when rounding changes from months to quarters/years. + * + * Requirements: 2.3, 3.2 + */ + public void testRoundingTransitionsWithMonthlyData() throws IOException { + // Create dataset with monthly data spanning multiple years + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add 60 months (5 years) of monthly data + for (int month = 0; month < 60; month++) { + dataset.add(start.plusMonths(month)); + } + + // Test with target bucket count of 5 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(60, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 10 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(60, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 20 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 20); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(60, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 50 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 50); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(60, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + + /** + * Integration test for rounding transitions with yearly data. + * Tests that bucket assignments remain correct when rounding changes from years to decades. + * + * Requirements: 2.3, 3.2 + */ + public void testRoundingTransitionsWithYearlyData() throws IOException { + // Create dataset with yearly data spanning multiple decades + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2000, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add 100 years of yearly data + for (int year = 0; year < 100; year++) { + dataset.add(start.plusYears(year)); + } + + // Test with target bucket count of 5 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(100, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 10 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(100, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 20 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 20); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(100, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 50 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 50); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(100, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + + /** + * Integration test for multiple rounding transitions with mixed granularity data. + * Tests that bucket assignments remain correct when data triggers multiple rounding changes. + * This test creates a dataset that starts with fine-grained data (hours) and progressively + * adds coarser data (days, months, years) to force multiple rounding transitions. + * + * Requirements: 2.3, 3.2 + */ + public void testMultipleRoundingTransitionsWithMixedData() throws IOException { + // Create dataset with mixed granularity that will trigger multiple rounding changes + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add hourly data for first 2 days (48 docs) + for (int hour = 0; hour < 48; hour++) { + dataset.add(start.plusHours(hour)); + } + + // Add daily data for next 30 days (30 docs) + for (int day = 2; day < 32; day++) { + dataset.add(start.plusDays(day)); + } + + // Add monthly data for next 12 months (12 docs) + for (int month = 1; month < 13; month++) { + dataset.add(start.plusMonths(month)); + } + + // Add yearly data for next 5 years (5 docs) + for (int year = 1; year < 6; year++) { + dataset.add(start.plusYears(year)); + } + + // Total: 48 + 30 + 12 + 5 = 95 docs + + // Test with target bucket count of 5 - should trigger multiple rounding increases + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(95, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 10 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(95, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 20 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 20); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(95, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + + // Test with target bucket count of 50 + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 50); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(95, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + + /** + * Integration test for rounding transitions with sparse data. + * Tests that bucket assignments remain correct when data has large gaps. + * + * Requirements: 2.3, 3.2 + */ + public void testRoundingTransitionsWithSparseData() throws IOException { + // Create dataset with sparse data that has large gaps + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add data with increasing gaps + dataset.add(start); + dataset.add(start.plusHours(1)); + dataset.add(start.plusHours(2)); + dataset.add(start.plusDays(1)); + dataset.add(start.plusDays(2)); + dataset.add(start.plusWeeks(1)); + dataset.add(start.plusWeeks(2)); + dataset.add(start.plusMonths(1)); + dataset.add(start.plusMonths(2)); + dataset.add(start.plusMonths(6)); + dataset.add(start.plusYears(1)); + dataset.add(start.plusYears(2)); + + // Test with various target bucket counts + for (int targetBuckets : Arrays.asList(5, 10, 20)) { + testSearchCase( + DEFAULT_QUERY, + dataset, + aggregation -> aggregation.setNumBuckets(targetBuckets).field(DATE_FIELD), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= targetBuckets); + + long totalDocs = histogram.getBuckets().stream() + .mapToLong(Histogram.Bucket::getDocCount) + .sum(); + assertEquals(12, totalDocs); + + verifyBucketsOrdered(histogram.getBuckets()); + } + ); + } + } + + /** + * Helper method to verify that histogram buckets are in ascending chronological order. + */ + private void verifyBucketsOrdered(List buckets) { + for (int i = 1; i < buckets.size(); i++) { + ZonedDateTime prev = (ZonedDateTime) buckets.get(i - 1).getKey(); + ZonedDateTime curr = (ZonedDateTime) buckets.get(i).getKey(); + assertTrue( + "Buckets should be in ascending order: " + prev + " should be before " + curr, + prev.isBefore(curr) + ); + } + } + @Override public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { /* From c166985830a945479cb2884ad3947e3f41125e71 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 24 Nov 2025 09:10:50 -0800 Subject: [PATCH 2/9] Fixed rounding logic in FromSingle. Added FromMany logic when called by filter rewrite * TODO: * add more test cases, currently does not assert that skiplist was used * add auto date benchmark to http_logs * verify existing benchmarks have no issue Signed-off-by: Asim Mahmood --- .../HistogramSkiplistLeafCollector.java | 47 +- .../AutoDateHistogramAggregator.java | 293 +++--- .../AutoDateHistogramAggregatorTests.java | 909 ++---------------- 3 files changed, 270 insertions(+), 979 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 6260934bbbea8..32d2500c09253 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -17,25 +17,30 @@ import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import java.io.IOException; +import java.util.function.LongFunction; +import java.util.function.Supplier; /** * Histogram collection logic using skip list. * + * Currently, it can only handle one owningBucketOrd at a time. + * * @opensearch.internal */ public class HistogramSkiplistLeafCollector extends LeafBucketCollector { private final NumericDocValues values; private final DocValuesSkipper skipper; - private final LongKeyedBucketOrds bucketOrds; private final LeafBucketCollector sub; private final BucketsAggregator aggregator; - + /** * Supplier function to get the current preparedRounding from the parent aggregator. * This allows detection of rounding changes in AutoDateHistogramAggregator. */ - private final java.util.function.Supplier preparedRoundingSupplier; + private final LongFunction preparedRoundingSupplier; + private final java.util.function.Supplier bucketOrdsSupplier; + private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded; /** * Max doc ID (inclusive) up to which all docs values may map to the same @@ -69,29 +74,32 @@ public HistogramSkiplistLeafCollector( ) { this.values = values; this.skipper = skipper; - this.preparedRoundingSupplier = () -> preparedRounding; - this.bucketOrds = bucketOrds; + this.preparedRoundingSupplier = (owningBucketOrd) -> preparedRounding; + this.bucketOrdsSupplier = () -> bucketOrds; this.sub = sub; this.aggregator = aggregator; + this.increaseRoundingIfNeeded = (owningBucketOrd, rounded) -> {}; } - + /** * Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator). */ public HistogramSkiplistLeafCollector( NumericDocValues values, DocValuesSkipper skipper, - java.util.function.Supplier preparedRoundingSupplier, - LongKeyedBucketOrds bucketOrds, + LongFunction preparedRoundingSupplier, + Supplier bucketOrdsSupplier, LeafBucketCollector sub, - BucketsAggregator aggregator + BucketsAggregator aggregator, + IncreaseRoundingIfNeeded increaseRoundingIfNeeded ) { this.values = values; this.skipper = skipper; this.preparedRoundingSupplier = preparedRoundingSupplier; - this.bucketOrds = bucketOrds; + this.bucketOrdsSupplier = bucketOrdsSupplier; this.sub = sub; this.aggregator = aggregator; + this.increaseRoundingIfNeeded = increaseRoundingIfNeeded; } @Override @@ -118,7 +126,7 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { upToInclusive = skipper.maxDocID(0); // Get current rounding from supplier - Rounding.Prepared currentRounding = preparedRoundingSupplier.get(); + Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd); // Now find the highest level where all docs map to the same bucket. for (int level = 0; level < skipper.numLevels(); ++level) { @@ -130,7 +138,7 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { // All docs at this level have a value, and all values map to the same bucket. upToInclusive = skipper.maxDocID(level); upToSameBucket = true; - upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); + upToBucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, maxBucket); if (upToBucketIndex < 0) { upToBucketIndex = -1 - upToBucketIndex; } @@ -142,13 +150,13 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Get current rounding from supplier - Rounding.Prepared currentRounding = preparedRoundingSupplier.get(); - + Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd); + // Check if rounding changed (using reference equality) // AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes if (currentRounding != lastPreparedRounding) { upToInclusive = -1; // Invalidate cache + upToSameBucket = false; lastPreparedRounding = currentRounding; } @@ -161,12 +169,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException { sub.collect(doc, upToBucketIndex); } else if (values.advanceExact(doc)) { final long value = values.longValue(); - long bucketIndex = bucketOrds.add(owningBucketOrd, currentRounding.round(value)); + long rounded = currentRounding.round(value); + long bucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, rounded); if (bucketIndex < 0) { bucketIndex = -1 - bucketIndex; aggregator.collectExistingBucket(sub, doc, bucketIndex); } else { aggregator.collectBucket(sub, doc, bucketIndex); + increaseRoundingIfNeeded.accept(owningBucketOrd, rounded); } } } @@ -179,7 +189,6 @@ public void collect(DocIdStream stream) throws IOException { @Override public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - // This will only be called if its the sub aggregation for (;;) { int upToExclusive = upToInclusive + 1; if (upToExclusive < 0) { // overflow @@ -210,4 +219,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } } } + + public interface IncreaseRoundingIfNeeded { + void accept(long owningBucket, long rounded); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index f82c08dcca838..60aac6dee38aa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -252,7 +252,11 @@ public final DeferringBucketCollector getDeferringCollector() { return deferringCollector; } - protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException; + protected abstract LeafBucketCollector getLeafCollector( + SortedNumericDocValues values, + DocValuesSkipper skipper, + LeafBucketCollector sub + ) throws IOException; @Override protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { @@ -271,8 +275,6 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc } final SortedNumericDocValues values = valuesSource.longValues(ctx); - // Try to unwrap to single-valued NumericDocValues - final NumericDocValues singleton = DocValues.unwrapSingleton(values); DocValuesSkipper skipper = null; if (this.fieldName != null) { skipper = ctx.reader().getDocValuesSkipper(this.fieldName); @@ -287,12 +289,12 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + iteratingCollector.collect(stream, owningBucketOrd); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + iteratingCollector.collectRange(min, max); } }; } @@ -421,21 +423,26 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) + throws IOException { // Check if skiplist optimization is available - // Requirements: 1.1, 1.2, 1.3, 1.4 final NumericDocValues singleton = DocValues.unwrapSingleton(values); if (singleton != null && skipper != null) { - // Increment skiplist collector count - skiplistCollectorCount++; - return new HistogramSkiplistLeafCollector( - singleton, - skipper, - () -> preparedRounding, // Pass supplier to allow rounding change detection - bucketOrds, - sub, - FromSingle.this - ); + // FIXME: replace isTryPrecomputePath with collector mode + if (parent == null || isTryPrecomputePath()) { + // Increment skiplist collector count + skiplistCollectorCount++; + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRounding, // for FromSingle there will be no parent/ + () -> bucketOrds, + sub, + FromSingle.this, + (owningBucket, rounded) -> increaseRoundingIfNeeded(rounded) // Pass supplier to allow rounding change + // detectionincreaseRoundingIfNeeded + ); + } } // Fall back to standard LeafBucketCollectorBase when skiplist unavailable @@ -482,62 +489,63 @@ private void collectValue(int doc, long rounded) throws IOException { increaseRoundingIfNeeded(rounded); } - /** - * Examine our current bucket count and the most recently added bucket to determine if an update to - * preparedRounding is required to keep total bucket count in compliance with targetBuckets. - * - * @param rounded the most recently collected value rounded - */ - private void increaseRoundingIfNeeded(long rounded) { - // If we are already using the rounding with the largest interval nothing can be done - if (roundingIdx >= roundingInfos.length - 1) { - return; - } + }; + } - // Re calculate the max and min values we expect to bucket according to most recently rounded val - min = Math.min(min, rounded); - max = Math.max(max, rounded); - - /** - * Quick explanation of the two below conditions: - * - * 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()] - * Represents the total bucket count possible before we will exceed targetBuckets - * even if we use the maximum inner interval of our current rounding. For example, consider the - * DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets). - * targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of - * 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets - * than our targetBuckets limit. If the current count of buckets exceeds this limit we must update - * our rounding. - * - * 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()] - * The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding - * getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min) - * exceeds this range we must update our rounding. - */ - if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() - && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { - return; + /** + * Examine our current bucket count and the most recently added bucket to determine if an update to + * preparedRounding is required to keep total bucket count in compliance with targetBuckets. + * + * @param rounded the most recently collected value rounded + */ + private void increaseRoundingIfNeeded(long rounded) { + // If we are already using the rounding with the largest interval nothing can be done + if (roundingIdx >= roundingInfos.length - 1) { + return; + } + + // Re calculate the max and min values we expect to bucket according to most recently rounded val + min = Math.min(min, rounded); + max = Math.max(max, rounded); + + /** + * Quick explanation of the two below conditions: + * + * 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()] + * Represents the total bucket count possible before we will exceed targetBuckets + * even if we use the maximum inner interval of our current rounding. For example, consider the + * DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets). + * targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of + * 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets + * than our targetBuckets limit. If the current count of buckets exceeds this limit we must update + * our rounding. + * + * 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()] + * The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding + * getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min) + * exceeds this range we must update our rounding. + */ + if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { + return; + } + do { + try (LongKeyedBucketOrds oldOrds = bucketOrds) { + preparedRounding = prepareRounding(++roundingIdx); + long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; + bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0); + while (ordsEnum.next()) { + long oldKey = ordsEnum.value(); + long newKey = preparedRounding.round(oldKey); + long newBucketOrd = bucketOrds.add(0, newKey); + mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; } - do { - try (LongKeyedBucketOrds oldOrds = bucketOrds) { - preparedRounding = prepareRounding(++roundingIdx); - long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; - bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0); - while (ordsEnum.next()) { - long oldKey = ordsEnum.value(); - long newKey = preparedRounding.round(oldKey); - long newBucketOrd = bucketOrds.add(0, newKey); - mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; - } - merge(mergeMap, bucketOrds.size()); - } - } while (roundingIdx < roundingInfos.length - 1 - && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() - || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis())); + merge(mergeMap, bucketOrds.size()); } - }; + } while (roundingIdx < roundingInfos.length - 1 + && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis())); } @Override @@ -656,6 +664,8 @@ private static class FromMany extends AutoDateHistogramAggregator { */ private int rebucketCount = 0; + private int skiplistCollectorCount = 0; + FromMany( String name, AggregatorFactories factories, @@ -698,7 +708,42 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) + throws IOException { + + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + if (singleton != null && skipper != null) { + // FIXME: replace isTryPrecomputePath with collector mode + /** + * HistogramSkiplistLeafCollector in its current state can only handle one owningBucketOrd at a time. + * When parent is null, i.e. then ForSingle class will get used. ForMany is used when auto date is sub agg. + * In the special case where in FilterRewrite (SubAggRangeCollector) logic, we can use Skiplist because we + * will one range (and thus owningBucketOrd) at time. + * + * In the future we can enhance HistogramSkiplistLeafCollector to handle multiple owningBucketOrd, + * similar to FromMany. + */ + if (isTryPrecomputePath()) { + // Increment skiplist collector count + skiplistCollectorCount++; + + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRoundings[roundingIndexFor(owningBucketOrd)], + () -> bucketOrds, + sub, + FromMany.this, + (owningBucketOrd, rounded) -> { + int roundingIdx = roundingIndexFor(owningBucketOrd); + liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); + int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1); + increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); + } + ); + } + } + return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -744,61 +789,62 @@ private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long ro return increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); } - /** - * Increase the rounding of {@code owningBucketOrd} using - * estimated, bucket counts, {@link FromMany#rebucket()} rebucketing} the all - * buckets if the estimated number of wasted buckets is too high. - */ - private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) { - if (oldRounding >= roundingInfos.length - 1) { - return oldRounding; - } - if (mins.size() < owningBucketOrd + 1) { - long oldSize = mins.size(); - mins = context.bigArrays().grow(mins, owningBucketOrd + 1); - mins.fill(oldSize, mins.size(), Long.MAX_VALUE); - } - if (maxes.size() < owningBucketOrd + 1) { - long oldSize = maxes.size(); - maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1); - maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE); - } - - long min = Math.min(mins.get(owningBucketOrd), newKey); - mins.set(owningBucketOrd, min); - long max = Math.max(maxes.get(owningBucketOrd), newKey); - maxes.set(owningBucketOrd, max); - if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval() - && max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) { - return oldRounding; - } - long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis; - int newRounding = oldRounding; - int newEstimatedBucketCount; - do { - newRounding++; - double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis(); - newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio); - } while (newRounding < roundingInfos.length - 1 - && (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval() - || max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis())); - setRounding(owningBucketOrd, newRounding); - mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd))); - maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd))); - wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount; - if (wastedBucketsOverestimate > nextRebucketAt) { - rebucket(); - // Bump the threshold for the next rebucketing - wastedBucketsOverestimate = 0; - nextRebucketAt *= 2; - } else { - liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount); - } - return newRounding; - } }; } + /** + * Increase the rounding of {@code owningBucketOrd} using + * estimated, bucket counts, {@link FromMany#rebucket()} rebucketing} the all + * buckets if the estimated number of wasted buckets is too high. + */ + private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) { + if (oldRounding >= roundingInfos.length - 1) { + return oldRounding; + } + if (mins.size() < owningBucketOrd + 1) { + long oldSize = mins.size(); + mins = context.bigArrays().grow(mins, owningBucketOrd + 1); + mins.fill(oldSize, mins.size(), Long.MAX_VALUE); + } + if (maxes.size() < owningBucketOrd + 1) { + long oldSize = maxes.size(); + maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1); + maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE); + } + + long min = Math.min(mins.get(owningBucketOrd), newKey); + mins.set(owningBucketOrd, min); + long max = Math.max(maxes.get(owningBucketOrd), newKey); + maxes.set(owningBucketOrd, max); + if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) { + return oldRounding; + } + long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis; + int newRounding = oldRounding; + int newEstimatedBucketCount; + do { + newRounding++; + double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis(); + newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio); + } while (newRounding < roundingInfos.length - 1 + && (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis())); + setRounding(owningBucketOrd, newRounding); + mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd))); + maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd))); + wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount; + if (wastedBucketsOverestimate > nextRebucketAt) { + rebucket(); + // Bump the threshold for the next rebucketing + wastedBucketsOverestimate = 0; + nextRebucketAt *= 2; + } else { + liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount); + } + return newRounding; + } + private void rebucket() { rebucketCount++; try (LongKeyedBucketOrds oldOrds = bucketOrds) { @@ -843,6 +889,7 @@ public void collectDebugInfo(BiConsumer add) { add.accept("wasted_buckets_overestimate", wastedBucketsOverestimate); add.accept("next_rebucket_at", nextRebucketAt); add.accept("rebucket_count", rebucketCount); + add.accept("skiplist_collectors_used", skiplistCollectorCount); } private void setRounding(long owningBucketOrd, int newRounding) { diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 2795fc57819be..5eab3da2b32fe 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -60,7 +60,6 @@ import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.MultiBucketConsumerService; -import org.opensearch.search.aggregations.bucket.terms.InternalTerms; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.metrics.InternalMax; @@ -96,7 +95,7 @@ import static org.hamcrest.Matchers.hasSize; public class AutoDateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { - private static final String DATE_FIELD = "date"; + private static final String DATE_FIELD = "@timestamp"; private static final String INSTANT_FIELD = "instant"; private static final String NUMERIC_FIELD = "numeric"; @@ -987,10 +986,20 @@ private void testSearchCase( final List dataset, final Consumer configure, final Consumer verify + ) throws IOException { + testSearchCase(query, dataset, false, configure, verify); + } + + private void testSearchCase( + final Query query, + final List dataset, + final boolean enableSkiplist, + final Consumer configure, + final Consumer verify ) throws IOException { try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { - indexSampleData(dataset, indexWriter); + indexSampleData(dataset, indexWriter, enableSkiplist); } try (IndexReader indexReader = DirectoryReader.open(directory)) { @@ -1020,11 +1029,19 @@ private void testSearchCase( } private void indexSampleData(List dataset, RandomIndexWriter indexWriter) throws IOException { + indexSampleData(dataset, indexWriter, false); + } + + private void indexSampleData(List dataset, RandomIndexWriter indexWriter, boolean enableSkiplist) throws IOException { final Document document = new Document(); int i = 0; for (final ZonedDateTime date : dataset) { final long instant = date.toInstant().toEpochMilli(); - document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + if (enableSkiplist) { + document.add(SortedNumericDocValuesField.indexedField(DATE_FIELD, instant)); + } else { + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + } document.add(new LongPoint(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); @@ -1060,87 +1077,79 @@ private Map maxAsMap(InternalAutoDateHistogram result) { * 2. New Rounding.Prepared instance is created on rounding change * 3. owningBucketOrd is always 0 in FromSingle context * 4. Bucket merging works correctly after rounding change - * + * * Requirements: 3.1, 3.2, 3.4 */ public void testSkiplistCollectorWithRoundingChange() throws IOException { // Create a dataset that will trigger rounding changes // Start with hourly data, then add data that spans months to force rounding increase final List dataset = new ArrayList<>(); - + // Add hourly data for first day (24 docs) ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); for (int hour = 0; hour < 24; hour++) { dataset.add(start.plusHours(hour)); } - + // Add data spanning several months to trigger rounding increase (30 docs) for (int month = 0; month < 6; month++) { for (int day = 0; day < 5; day++) { dataset.add(start.plusMonths(month).plusDays(day * 6)); } } - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - // Verify that aggregation completed successfully - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created (exact count depends on rounding chosen) - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - // Verify total doc count matches input - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(54, totalDocs); // 24 + 30 = 54 docs - - // Verify buckets are in ascending order (requirement for histogram) - List buckets = histogram.getBuckets(); - for (int i = 1; i < buckets.size(); i++) { - assertTrue( - ((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey()) - ); - } + + testSearchCase(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), histogram -> { + // Verify that aggregation completed successfully + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created (exact count depends on rounding chosen) + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + // Verify total doc count matches input + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + assertEquals(54, totalDocs); // 24 + 30 = 54 docs + + // Verify buckets are in ascending order (requirement for histogram) + List buckets = histogram.getBuckets(); + for (int i = 1; i < buckets.size(); i++) { + assertTrue(((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey())); } - ); + }); } /** * Test that verifies skiplist collector handles rounding changes correctly with sub-aggregations. * This ensures that when rounding changes mid-collection, sub-aggregations still produce correct results. - * + * * Requirements: 3.1, 3.2, 3.4 */ public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException { // Create dataset that triggers rounding change final List dataset = new ArrayList<>(); ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - + // Add data spanning months to trigger rounding increase for (int month = 0; month < 12; month++) { for (int day = 0; day < 3; day++) { dataset.add(start.plusMonths(month).plusDays(day * 10)); } } - + testSearchCase( DEFAULT_QUERY, dataset, + true, aggregation -> aggregation.setNumBuckets(8) .field(DATE_FIELD) .subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)), histogram -> { assertTrue(AggregationInspectionHelper.hasValue(histogram)); - + // Verify buckets were created assertFalse(histogram.getBuckets().isEmpty()); assertTrue(histogram.getBuckets().size() <= 8); - + // Verify sub-aggregations are present and valid for (Histogram.Bucket bucket : histogram.getBuckets()) { InternalStats stats = bucket.getAggregations().get("stats"); @@ -1150,11 +1159,9 @@ public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException assertTrue(stats.getCount() > 0); } } - + // Verify total doc count - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); assertEquals(36, totalDocs); // 12 months * 3 days = 36 docs } ); @@ -1163,825 +1170,49 @@ public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException /** * Test that verifies bucket merging works correctly after rounding change. * This test creates a scenario where buckets must be merged when rounding increases. - * + * * Requirements: 3.2, 3.4 */ public void testSkiplistCollectorBucketMergingAfterRoundingChange() throws IOException { // Create dataset with fine-grained data that will be merged into coarser buckets final List dataset = new ArrayList<>(); ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - + // Add hourly data for multiple days, then add data spanning years // This forces rounding to increase from hours -> days -> months -> years - for (int day = 0; day < 10; day++) { + for (int day = 0; day < 5; day++) { for (int hour = 0; hour < 24; hour++) { dataset.add(start.plusDays(day).plusHours(hour)); } } - + // Add data spanning multiple years to force coarse rounding for (int year = 0; year < 5; year++) { for (int month = 0; month < 12; month += 3) { dataset.add(start.plusYears(year).plusMonths(month)); } } - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created and merged appropriately - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - // Verify total doc count is preserved after merging - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(260, totalDocs); // (10 days * 24 hours) + (5 years * 4 quarters) = 240 + 20 = 260 - - // Verify buckets are properly ordered - List buckets = histogram.getBuckets(); - for (int i = 1; i < buckets.size(); i++) { - assertTrue( - ((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey()) - ); - } - } - ); - } - - /** - * Test skiplist collector with terms sub-aggregation. - * Verifies that terms sub-aggregation produces correct results when using skiplist collector. - * - * Requirements: 4.4 - */ - public void testSkiplistCollectorWithTermsSubAggregation() throws IOException { - // Create dataset with dates and a categorical field - final List dataset = Arrays.asList( - ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) - ); - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5) - .field(DATE_FIELD) - .subAggregation(AggregationBuilders.terms("terms").field(NUMERIC_FIELD)), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created - assertFalse(histogram.getBuckets().isEmpty()); - - // Verify terms sub-aggregation is present in each bucket - for (Histogram.Bucket bucket : histogram.getBuckets()) { - if (bucket.getDocCount() > 0) { - // NUMERIC_FIELD is a long field, so it returns LongTerms (which extends InternalTerms) - InternalTerms termsAgg = bucket.getAggregations().get("terms"); - assertNotNull("Terms sub-aggregation should be present", termsAgg); - assertTrue("Terms sub-aggregation should have buckets", termsAgg.getBuckets().size() > 0); - } - } - - // Verify total doc count - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(6, totalDocs); - } - ); - } - - /** - * Test skiplist collector with stats sub-aggregation. - * Verifies that stats sub-aggregation produces correct results when using skiplist collector. - * - * Requirements: 4.4 - */ - public void testSkiplistCollectorWithStatsSubAggregation() throws IOException { - // Create dataset with dates - final List dataset = Arrays.asList( - ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) - ); - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5) - .field(DATE_FIELD) - .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created - assertFalse(histogram.getBuckets().isEmpty()); - - // Verify stats sub-aggregation is present and correct in each bucket - for (Histogram.Bucket bucket : histogram.getBuckets()) { - InternalStats stats = bucket.getAggregations().get("stats"); - assertNotNull("Stats sub-aggregation should be present", stats); - - if (bucket.getDocCount() > 0) { - assertTrue("Stats sub-aggregation should have value", AggregationInspectionHelper.hasValue(stats)); - assertEquals("Stats count should match bucket doc count", bucket.getDocCount(), stats.getCount()); - assertFalse("Stats min should not be infinite", Double.isInfinite(stats.getMin())); - assertFalse("Stats max should not be infinite", Double.isInfinite(stats.getMax())); - } else { - assertFalse("Empty bucket stats should not have value", AggregationInspectionHelper.hasValue(stats)); - } - } - - // Verify total doc count - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(6, totalDocs); - } - ); - } - - /** - * Test skiplist collector with max sub-aggregation. - * Verifies that max sub-aggregation produces correct results when using skiplist collector. - * - * Requirements: 4.4 - */ - public void testSkiplistCollectorWithMaxSubAggregation() throws IOException { - // Create dataset with dates - final List dataset = Arrays.asList( - ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 1, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 2, 15, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(2020, 3, 15, 0, 0, 0, 0, ZoneOffset.UTC) - ); - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5) - .field(DATE_FIELD) - .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created - assertFalse(histogram.getBuckets().isEmpty()); - - // Verify max sub-aggregation is present and correct in each bucket - for (Histogram.Bucket bucket : histogram.getBuckets()) { - InternalMax max = bucket.getAggregations().get("max"); - assertNotNull("Max sub-aggregation should be present", max); - - if (bucket.getDocCount() > 0) { - assertTrue("Max sub-aggregation should have value", AggregationInspectionHelper.hasValue(max)); - assertFalse("Max value should not be infinite", Double.isInfinite(max.getValue())); - assertTrue("Max value should be non-negative", max.getValue() >= 0); - } else { - assertFalse("Empty bucket max should not have value", AggregationInspectionHelper.hasValue(max)); - } - } - - // Verify total doc count - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(6, totalDocs); - } - ); - } - - /** - * Test that verifies sub-aggregation results match between skiplist and standard collectors. - * This test compares results from both collectors to ensure correctness. - * - * Requirements: 4.4 - */ - public void testSkiplistCollectorSubAggregationResultsMatchStandard() throws IOException { - // Create a dataset that will use skiplist collector - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add data spanning several months - for (int month = 0; month < 6; month++) { - for (int day = 0; day < 5; day++) { - dataset.add(start.plusMonths(month).plusDays(day * 6)); - } - } - - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(8) - .field(DATE_FIELD) - .subAggregation(AggregationBuilders.stats("stats").field(NUMERIC_FIELD)) - .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - - // Verify buckets were created - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 8); - - // Verify both sub-aggregations are present and consistent - for (Histogram.Bucket bucket : histogram.getBuckets()) { - if (bucket.getDocCount() > 0) { - InternalStats stats = bucket.getAggregations().get("stats"); - InternalMax max = bucket.getAggregations().get("max"); - - assertNotNull("Stats sub-aggregation should be present", stats); - assertNotNull("Max sub-aggregation should be present", max); - - assertTrue("Stats should have value", AggregationInspectionHelper.hasValue(stats)); - assertTrue("Max should have value", AggregationInspectionHelper.hasValue(max)); - - // Verify consistency: max from stats should equal max aggregation - assertEquals( - "Max from stats should match max aggregation", - stats.getMax(), - max.getValue(), - 0.0001 - ); - - // Verify stats count matches bucket doc count - assertEquals( - "Stats count should match bucket doc count", - bucket.getDocCount(), - stats.getCount() - ); - } - } - - // Verify total doc count - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(30, totalDocs); // 6 months * 5 days = 30 docs - } - ); - } - - /** - * Integration test for rounding transitions with hourly data. - * Tests that bucket assignments remain correct when rounding changes from hours to days. - * - * Requirements: 2.3, 3.2 - */ - public void testRoundingTransitionsWithHourlyData() throws IOException { - // Create dataset with hourly data spanning multiple days - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add 168 hours (7 days) of hourly data - for (int hour = 0; hour < 168; hour++) { - dataset.add(start.plusHours(hour)); - } - - // Test with target bucket count of 5 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 5); - - // Verify total doc count is preserved - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(168, totalDocs); - - // Verify buckets are ordered - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 10 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(168, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 20 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 20); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(168, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 50 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 50); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(168, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } - - /** - * Integration test for rounding transitions with daily data. - * Tests that bucket assignments remain correct when rounding changes from days to weeks/months. - * - * Requirements: 2.3, 3.2 - */ - public void testRoundingTransitionsWithDailyData() throws IOException { - // Create dataset with daily data spanning multiple months - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add 180 days (approximately 6 months) of daily data - for (int day = 0; day < 180; day++) { - dataset.add(start.plusDays(day)); - } - - // Test with target bucket count of 5 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 5); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(180, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 10 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(180, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 20 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 20); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(180, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 50 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 50); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(180, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } - /** - * Integration test for rounding transitions with monthly data. - * Tests that bucket assignments remain correct when rounding changes from months to quarters/years. - * - * Requirements: 2.3, 3.2 - */ - public void testRoundingTransitionsWithMonthlyData() throws IOException { - // Create dataset with monthly data spanning multiple years - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add 60 months (5 years) of monthly data - for (int month = 0; month < 60; month++) { - dataset.add(start.plusMonths(month)); - } - - // Test with target bucket count of 5 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 5); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(60, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 10 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(60, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 20 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 20); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(60, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 50 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 50); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(60, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } + testSearchCase(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); - /** - * Integration test for rounding transitions with yearly data. - * Tests that bucket assignments remain correct when rounding changes from years to decades. - * - * Requirements: 2.3, 3.2 - */ - public void testRoundingTransitionsWithYearlyData() throws IOException { - // Create dataset with yearly data spanning multiple decades - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2000, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add 100 years of yearly data - for (int year = 0; year < 100; year++) { - dataset.add(start.plusYears(year)); - } - - // Test with target bucket count of 5 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 5); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(100, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 10 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(100, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 20 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 20); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(100, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 50 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 50); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(100, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } + // Verify buckets were created and merged appropriately + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); - /** - * Integration test for multiple rounding transitions with mixed granularity data. - * Tests that bucket assignments remain correct when data triggers multiple rounding changes. - * This test creates a dataset that starts with fine-grained data (hours) and progressively - * adds coarser data (days, months, years) to force multiple rounding transitions. - * - * Requirements: 2.3, 3.2 - */ - public void testMultipleRoundingTransitionsWithMixedData() throws IOException { - // Create dataset with mixed granularity that will trigger multiple rounding changes - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add hourly data for first 2 days (48 docs) - for (int hour = 0; hour < 48; hour++) { - dataset.add(start.plusHours(hour)); - } - - // Add daily data for next 30 days (30 docs) - for (int day = 2; day < 32; day++) { - dataset.add(start.plusDays(day)); - } - - // Add monthly data for next 12 months (12 docs) - for (int month = 1; month < 13; month++) { - dataset.add(start.plusMonths(month)); - } - - // Add yearly data for next 5 years (5 docs) - for (int year = 1; year < 6; year++) { - dataset.add(start.plusYears(year)); - } - - // Total: 48 + 30 + 12 + 5 = 95 docs - - // Test with target bucket count of 5 - should trigger multiple rounding increases - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 5); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(95, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 10 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 10); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(95, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 20 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 20); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(95, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - - // Test with target bucket count of 50 - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= 50); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(95, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } + // Verify total doc count is preserved after merging + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + assertEquals(140, totalDocs); // (5 days * 24 hours) + (5 years * 4 quarters) = 120 + 20 = 140 - /** - * Integration test for rounding transitions with sparse data. - * Tests that bucket assignments remain correct when data has large gaps. - * - * Requirements: 2.3, 3.2 - */ - public void testRoundingTransitionsWithSparseData() throws IOException { - // Create dataset with sparse data that has large gaps - final List dataset = new ArrayList<>(); - ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - - // Add data with increasing gaps - dataset.add(start); - dataset.add(start.plusHours(1)); - dataset.add(start.plusHours(2)); - dataset.add(start.plusDays(1)); - dataset.add(start.plusDays(2)); - dataset.add(start.plusWeeks(1)); - dataset.add(start.plusWeeks(2)); - dataset.add(start.plusMonths(1)); - dataset.add(start.plusMonths(2)); - dataset.add(start.plusMonths(6)); - dataset.add(start.plusYears(1)); - dataset.add(start.plusYears(2)); - - // Test with various target bucket counts - for (int targetBuckets : Arrays.asList(5, 10, 20)) { - testSearchCase( - DEFAULT_QUERY, - dataset, - aggregation -> aggregation.setNumBuckets(targetBuckets).field(DATE_FIELD), - histogram -> { - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - assertFalse(histogram.getBuckets().isEmpty()); - assertTrue(histogram.getBuckets().size() <= targetBuckets); - - long totalDocs = histogram.getBuckets().stream() - .mapToLong(Histogram.Bucket::getDocCount) - .sum(); - assertEquals(12, totalDocs); - - verifyBucketsOrdered(histogram.getBuckets()); - } - ); - } - } + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 124); // 5 * 24 hours + 4 quarters + expectedDocCount.put("2021-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2022-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2023-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2024-01-01T00:00:00.000Z", 4); - /** - * Helper method to verify that histogram buckets are in ascending chronological order. - */ - private void verifyBucketsOrdered(List buckets) { - for (int i = 1; i < buckets.size(); i++) { - ZonedDateTime prev = (ZonedDateTime) buckets.get(i - 1).getKey(); - ZonedDateTime curr = (ZonedDateTime) buckets.get(i).getKey(); - assertTrue( - "Buckets should be in ascending order: " + prev + " should be before " + curr, - prev.isBefore(curr) - ); - } + assertThat(bucketCountsAsMap(histogram), equalTo(expectedDocCount)); + }); } @Override From ac85f8ceb289191a2dc8a00c640b2f7bdddbe508 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 24 Nov 2025 09:53:57 -0800 Subject: [PATCH 3/9] Javadoc Signed-off-by: Asim Mahmood --- .../aggregations/bucket/HistogramSkiplistLeafCollector.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 32d2500c09253..6ca92114eef4e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -220,6 +220,11 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } } + /** + * Call back for auto date histogram + * + * @opensearch.internal + */ public interface IncreaseRoundingIfNeeded { void accept(long owningBucket, long rounded); } From 5f99c9dddf79c0124e249767535ede26270fecca Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 1 Dec 2025 22:28:43 -0800 Subject: [PATCH 4/9] Merge main Signed-off-by: Asim Mahmood # Conflicts: # server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java --- .../HistogramSkiplistLeafCollector.java | 20 ++++++ .../AutoDateHistogramAggregator.java | 67 +++++++++---------- .../histogram/DateHistogramAggregator.java | 20 +----- .../FilterRewriteSubAggTests.java | 13 +++- 4 files changed, 61 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 6ca92114eef4e..0adbef61c777a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -13,7 +13,10 @@ import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.Scorable; import org.opensearch.common.Rounding; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorBase; import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import java.io.IOException; @@ -228,4 +231,21 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException public interface IncreaseRoundingIfNeeded { void accept(long owningBucket, long rounded); } + + /** + * Skiplist is based as top level agg (null parent) or parent that will execute in sorted order + * + */ + public static boolean canUseSkiplist(LongBounds hardBounds, Aggregator parent, DocValuesSkipper skipper, NumericDocValues singleton) { + if (skipper == null || singleton == null) return false; + // TODO: add hard bounds support + if (hardBounds != null) return false; + + if (parent == null) return true; + + if (parent instanceof AggregatorBase base) { + return base.getLeafCollectorMode() == AggregatorBase.LeafCollectionMode.FILTER_REWRITE; + } + return false; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 60aac6dee38aa..292d107db8aa0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -427,22 +427,19 @@ protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, Do throws IOException { // Check if skiplist optimization is available final NumericDocValues singleton = DocValues.unwrapSingleton(values); - if (singleton != null && skipper != null) { - // FIXME: replace isTryPrecomputePath with collector mode - if (parent == null || isTryPrecomputePath()) { - // Increment skiplist collector count - skiplistCollectorCount++; - return new HistogramSkiplistLeafCollector( - singleton, - skipper, - (owningBucketOrd) -> preparedRounding, // for FromSingle there will be no parent/ - () -> bucketOrds, - sub, - FromSingle.this, - (owningBucket, rounded) -> increaseRoundingIfNeeded(rounded) // Pass supplier to allow rounding change - // detectionincreaseRoundingIfNeeded - ); - } + if (HistogramSkiplistLeafCollector.canUseSkiplist(null, parent, skipper, singleton)) { + // Increment skiplist collector count + skiplistCollectorCount++; + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRounding, // for FromSingle there will be no parent/ + () -> bucketOrds, + sub, + FromSingle.this, + (owningBucket, rounded) -> increaseRoundingIfNeeded(rounded) // Pass supplier to allow rounding change + // detectionincreaseRoundingIfNeeded + ); } // Fall back to standard LeafBucketCollectorBase when skiplist unavailable @@ -712,8 +709,7 @@ protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, Do throws IOException { final NumericDocValues singleton = DocValues.unwrapSingleton(values); - if (singleton != null && skipper != null) { - // FIXME: replace isTryPrecomputePath with collector mode + if (HistogramSkiplistLeafCollector.canUseSkiplist(null, parent, skipper, singleton)) { /** * HistogramSkiplistLeafCollector in its current state can only handle one owningBucketOrd at a time. * When parent is null, i.e. then ForSingle class will get used. ForMany is used when auto date is sub agg. @@ -723,25 +719,22 @@ protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, Do * In the future we can enhance HistogramSkiplistLeafCollector to handle multiple owningBucketOrd, * similar to FromMany. */ - if (isTryPrecomputePath()) { - // Increment skiplist collector count - skiplistCollectorCount++; - - return new HistogramSkiplistLeafCollector( - singleton, - skipper, - (owningBucketOrd) -> preparedRoundings[roundingIndexFor(owningBucketOrd)], - () -> bucketOrds, - sub, - FromMany.this, - (owningBucketOrd, rounded) -> { - int roundingIdx = roundingIndexFor(owningBucketOrd); - liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); - int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1); - increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); - } - ); - } + skiplistCollectorCount++; + + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRoundings[roundingIndexFor(owningBucketOrd)], + () -> bucketOrds, + sub, + FromMany.this, + (owningBucketOrd, rounded) -> { + int roundingIdx = roundingIndexFor(owningBucketOrd); + liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); + int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1); + increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); + } + ); } return new LeafBucketCollectorBase(sub, values) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 42dfc30b0da63..eb7f167015d13 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -53,7 +53,6 @@ import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; -import org.opensearch.search.aggregations.AggregatorBase; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.CardinalityUpperBound; @@ -233,7 +232,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol final SortedNumericDocValues values = valuesSource.longValues(ctx); final NumericDocValues singleton = DocValues.unwrapSingleton(values); - if (canUseSkiplist(skipper, singleton)) { + if (HistogramSkiplistLeafCollector.canUseSkiplist(hardBounds, parent, skipper, singleton)) { skipListCollectorsUsed++; return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } @@ -295,23 +294,6 @@ public void collectRange(int min, int max) throws IOException { }; } - /** - * Skiplist is based as top level agg (null parent) or parent that will execute in sorted order - * - */ - private boolean canUseSkiplist(DocValuesSkipper skipper, NumericDocValues singleton) { - if (skipper == null || singleton == null) return false; - // TODO: add hard bounds support - if (hardBounds != null) return false; - - if (parent == null) return true; - - if (parent instanceof AggregatorBase base) { - return base.getLeafCollectorMode() == LeafCollectionMode.FILTER_REWRITE; - } - return false; - } - private void collectValue(LeafBucketCollector sub, int doc, long owningBucketOrd, long rounded) throws IOException { if (hardBounds == null || hardBounds.contain(rounded)) { long bucketOrd = bucketOrds.add(owningBucketOrd, rounded); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index 26c540451f2be..f61caca550f22 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -284,6 +284,13 @@ public void testRangeWithCard() throws IOException { /** * Test that verifies skiplist-based collection works correctly with range aggregations * that have date histogram sub-aggregations. + * + * This test exercises the following code paths: + * 1. HistogramSkiplistLeafCollector.collect() - skiplist-based document collection + * 2. HistogramSkiplistLeafCollector.advanceSkipper() - skiplist advancement with upToBucket logic + * 3. SubAggRangeCollector.collect() - sub-aggregation collection path + * + * The test uses: * - Index sort on date field to enable skiplist functionality * - Multiple segments created via explicit commits * - Searchable date field type @@ -331,7 +338,7 @@ public void testRangeDate() throws IOException { InternalRange.Bucket firstBucket = buckets.get(0); assertEquals(5, firstBucket.getDocCount()); InternalDateHistogram firstDate = firstBucket.getAggregations().get(dateAggName); - assertNotNull(firstDate); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", firstDate); assertEquals(1, firstDate.getBuckets().size()); assertEquals(5, firstDate.getBuckets().get(0).getDocCount()); @@ -339,7 +346,7 @@ public void testRangeDate() throws IOException { InternalRange.Bucket secondBucket = buckets.get(1); assertEquals(8, secondBucket.getDocCount()); InternalDateHistogram secondDate = secondBucket.getAggregations().get(dateAggName); - assertNotNull(secondDate); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", secondDate); assertEquals(2, secondDate.getBuckets().size()); assertEquals(5, secondDate.getBuckets().get(0).getDocCount()); assertEquals(3, secondDate.getBuckets().get(1).getDocCount()); @@ -348,7 +355,7 @@ public void testRangeDate() throws IOException { InternalRange.Bucket thirdBucket = buckets.get(2); assertEquals(7, thirdBucket.getDocCount()); InternalDateHistogram thirdDate = thirdBucket.getAggregations().get(dateAggName); - assertNotNull(thirdDate); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", thirdDate); assertEquals(1, thirdDate.getBuckets().size()); assertEquals(7, thirdDate.getBuckets().get(0).getDocCount()); } From f6b756c6defa32536bc7529041dc6c0deee97a58 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 1 Dec 2025 23:05:23 -0800 Subject: [PATCH 5/9] Change log Signed-off-by: Asim Mahmood --- CHANGELOG.md | 2 +- .../aggregations/bucket/HistogramSkiplistLeafCollector.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fdedef54abed..97efcd3db6d2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,8 +70,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Refactor the Cache.CacheStats class to use the Builder pattern instead of constructors ([#20015](https://github.com/opensearch-project/OpenSearch/pull/20015)) - Refactor the HttpStats, ScriptStats, AdaptiveSelectionStats and OsStats class to use the Builder pattern instead of constructors ([#20014](https://github.com/opensearch-project/OpenSearch/pull/20014)) - Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059)) - - Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966)) +- Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057)) ### Fixed - Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 0adbef61c777a..5bb20ced82bb5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -158,7 +158,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException { // Check if rounding changed (using reference equality) // AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes if (currentRounding != lastPreparedRounding) { - upToInclusive = -1; // Invalidate cache + upToInclusive = -1; // Invalidate upToSameBucket = false; lastPreparedRounding = currentRounding; } From 44bbcb3fb9eac4ac855c47a21aab7cae286840ee Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 1 Dec 2025 23:32:46 -0800 Subject: [PATCH 6/9] Minor changes Signed-off-by: Asim Mahmood --- .../bucket/HistogramSkiplistLeafCollector.java | 14 +++++--------- .../histogram/AutoDateHistogramAggregator.java | 1 - 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 5bb20ced82bb5..52547a0441425 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -35,6 +35,7 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector { private final NumericDocValues values; private final DocValuesSkipper skipper; private final LeafBucketCollector sub; + private final boolean isSubNoOp; private final BucketsAggregator aggregator; /** @@ -42,7 +43,7 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector { * This allows detection of rounding changes in AutoDateHistogramAggregator. */ private final LongFunction preparedRoundingSupplier; - private final java.util.function.Supplier bucketOrdsSupplier; + private final Supplier bucketOrdsSupplier; private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded; /** @@ -75,13 +76,7 @@ public HistogramSkiplistLeafCollector( LeafBucketCollector sub, BucketsAggregator aggregator ) { - this.values = values; - this.skipper = skipper; - this.preparedRoundingSupplier = (owningBucketOrd) -> preparedRounding; - this.bucketOrdsSupplier = () -> bucketOrds; - this.sub = sub; - this.aggregator = aggregator; - this.increaseRoundingIfNeeded = (owningBucketOrd, rounded) -> {}; + this(values, skipper, (owningBucketOrd) -> preparedRounding, () -> bucketOrds, sub, aggregator, (owningBucketOrd, rounded) -> {}); } /** @@ -101,6 +96,7 @@ public HistogramSkiplistLeafCollector( this.preparedRoundingSupplier = preparedRoundingSupplier; this.bucketOrdsSupplier = bucketOrdsSupplier; this.sub = sub; + this.isSubNoOp = (sub == NO_OP_COLLECTOR); this.aggregator = aggregator; this.increaseRoundingIfNeeded = increaseRoundingIfNeeded; } @@ -199,7 +195,7 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } if (upToSameBucket) { - if (sub == NO_OP_COLLECTOR) { + if (isSubNoOp) { // stream.count maybe faster when we don't need to handle sub-aggs long count = stream.count(upToExclusive); aggregator.incrementBucketDocCount(upToBucketIndex, count); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 292d107db8aa0..f61acc95fbb8a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -438,7 +438,6 @@ protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, Do sub, FromSingle.this, (owningBucket, rounded) -> increaseRoundingIfNeeded(rounded) // Pass supplier to allow rounding change - // detectionincreaseRoundingIfNeeded ); } From 662378263e2aa03a8c90c91aea1d7577c71a0b45 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Fri, 5 Dec 2025 15:15:45 -0800 Subject: [PATCH 7/9] Add equivalence test case with random values Signed-off-by: Asim Mahmood --- .../AutoDateHistogramAggregatorTests.java | 99 ++++++++++++++++++- 1 file changed, 95 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 5eab3da2b32fe..dcedc4456d02d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -33,18 +33,22 @@ package org.opensearch.search.aggregations.bucket.histogram; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.util.BytesRef; @@ -95,6 +99,7 @@ import static org.hamcrest.Matchers.hasSize; public class AutoDateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { + // @timestamp field name by default uses skip_list private static final String DATE_FIELD = "@timestamp"; private static final String INSTANT_FIELD = "instant"; private static final String NUMERIC_FIELD = "numeric"; @@ -1004,7 +1009,6 @@ private void testSearchCase( try (IndexReader indexReader = DirectoryReader.open(directory)) { final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); - final AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name"); if (configure != null) { configure.accept(aggregationBuilder); @@ -1045,10 +1049,18 @@ private void indexSampleData(List dataset, RandomIndexWriter inde document.add(new LongPoint(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); + document.add(new StringField("filterField", "a", Field.Store.NO)); indexWriter.addDocument(document); document.clear(); i += 1; } + // delete a doc to avoid approx optimization + if (enableSkiplist) { + document.add(new StringField("someField", "a", Field.Store.NO)); + indexWriter.addDocument(document); + indexWriter.commit(); + indexWriter.deleteDocuments(new TermQuery(new Term("someField", "a"))); + } } private Map bucketCountsAsMap(InternalAutoDateHistogram result) { @@ -1078,7 +1090,6 @@ private Map maxAsMap(InternalAutoDateHistogram result) { * 3. owningBucketOrd is always 0 in FromSingle context * 4. Bucket merging works correctly after rounding change * - * Requirements: 3.1, 3.2, 3.4 */ public void testSkiplistCollectorWithRoundingChange() throws IOException { // Create a dataset that will trigger rounding changes @@ -1122,7 +1133,6 @@ public void testSkiplistCollectorWithRoundingChange() throws IOException { * Test that verifies skiplist collector handles rounding changes correctly with sub-aggregations. * This ensures that when rounding changes mid-collection, sub-aggregations still produce correct results. * - * Requirements: 3.1, 3.2, 3.4 */ public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException { // Create dataset that triggers rounding change @@ -1171,7 +1181,6 @@ public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException * Test that verifies bucket merging works correctly after rounding change. * This test creates a scenario where buckets must be merged when rounding increases. * - * Requirements: 3.2, 3.4 */ public void testSkiplistCollectorBucketMergingAfterRoundingChange() throws IOException { // Create dataset with fine-grained data that will be merged into coarser buckets @@ -1215,6 +1224,88 @@ public void testSkiplistCollectorBucketMergingAfterRoundingChange() throws IOExc }); } + /** + * Test that verifies skiplist and non-skiplist collectors produce identical results. + * Uses random number of documents (20-200) and random date distribution. + */ + public void testSkiplistEquivalence() throws IOException { + // Generate random number of documents between 20 and 200 + final int numDocs = randomIntBetween(20, 200); + final List dataset = new ArrayList<>(numDocs); + + // Generate random dates spanning a year + final ZonedDateTime startDate = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + final long yearInMillis = 365L * 24 * 60 * 60 * 1000; + + for (int i = 0; i < numDocs; i++) { + long randomOffset = randomLongBetween(0, yearInMillis); + dataset.add(startDate.plusSeconds(randomOffset / 1000)); + } + + // Sort dataset to ensure consistent ordering + dataset.sort(ZonedDateTime::compareTo); + + // Test with random number of buckets + final int numBuckets = randomIntBetween(5, 20); + + // Run aggregation without skiplist + final InternalAutoDateHistogram histogramWithoutSkiplist = runAggregation(dataset, false, numBuckets); + + // Run aggregation with skiplist + final InternalAutoDateHistogram histogramWithSkiplist = runAggregation(dataset, true, numBuckets); + + // Verify both produce identical results + assertEquals( + "Bucket count mismatch between skiplist and non-skiplist", + histogramWithoutSkiplist.getBuckets().size(), + histogramWithSkiplist.getBuckets().size() + ); + + // Verify each bucket matches + for (int i = 0; i < histogramWithoutSkiplist.getBuckets().size(); i++) { + InternalAutoDateHistogram.Bucket bucketWithout = histogramWithoutSkiplist.getBuckets().get(i); + InternalAutoDateHistogram.Bucket bucketWith = histogramWithSkiplist.getBuckets().get(i); + + assertEquals("Bucket key mismatch at index " + i, bucketWithout.getKey(), bucketWith.getKey()); + + assertEquals( + "Doc count mismatch at index " + i + " for key " + bucketWithout.getKeyAsString(), + bucketWithout.getDocCount(), + bucketWith.getDocCount() + ); + } + + // Verify total doc counts match + long totalDocsWithout = histogramWithoutSkiplist.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + long totalDocsWith = histogramWithSkiplist.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + + assertEquals("Total doc count mismatch", totalDocsWithout, totalDocsWith); + assertEquals("Total doc count should match input", numDocs, totalDocsWithout); + } + + private InternalAutoDateHistogram runAggregation(List dataset, boolean enableSkiplist, int numBuckets) + throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexSampleData(dataset, indexWriter, enableSkiplist); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + final AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name") + .setNumBuckets(numBuckets) + .field(DATE_FIELD); + + final DateFieldMapper.DateFieldType fieldType = new DateFieldMapper.DateFieldType(DATE_FIELD); + MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(INSTANT_FIELD, NumberFieldMapper.NumberType.LONG); + MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NUMERIC_FIELD, NumberFieldMapper.NumberType.LONG); + + return searchAndReduce(indexSearcher, DEFAULT_QUERY, aggregationBuilder, fieldType, instantFieldType, numericFieldType); + } + } + } + @Override public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { /* From 350729d1a3df517c7a821ac42f496ba3cbab34db Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Fri, 5 Dec 2025 15:23:57 -0800 Subject: [PATCH 8/9] Remove unused field Signed-off-by: Asim Mahmood --- .../bucket/histogram/AutoDateHistogramAggregatorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index dcedc4456d02d..8cff2fd1f4e34 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -1049,7 +1049,6 @@ private void indexSampleData(List dataset, RandomIndexWriter inde document.add(new LongPoint(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); - document.add(new StringField("filterField", "a", Field.Store.NO)); indexWriter.addDocument(document); document.clear(); i += 1; From 9d011ca5c3e820d2fd598921c5aa7cfe1240e8b2 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Mon, 8 Dec 2025 10:10:29 -0800 Subject: [PATCH 9/9] Removed unnecessary code Signed-off-by: Asim Mahmood --- .../histogram/AutoDateHistogramAggregator.java | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index f61acc95fbb8a..57134d07d5021 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -280,23 +280,7 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc skipper = ctx.reader().getDocValuesSkipper(this.fieldName); } - final LeafBucketCollector iteratingCollector = getLeafCollector(values, skipper, sub); - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - iteratingCollector.collect(doc, owningBucketOrd); - } - - @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - iteratingCollector.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - iteratingCollector.collectRange(min, max); - } - }; + return getLeafCollector(values, skipper, sub); } protected final InternalAggregation[] buildAggregations(