diff --git a/CHANGELOG.md b/CHANGELOG.md index fff7635a6f300..9e11c75a8aee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963)) ### Changed +- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) - Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551)) - Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java index 42ba00e9182bf..c78d38032cb56 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java @@ -73,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator { private Map subAggregatorbyName; private final CircuitBreakerService breakerService; private long requestBytesUsed; + protected LeafCollectionMode leafCollectorMode = LeafCollectionMode.NORMAL; /** * Constructs a new Aggregator. @@ -236,6 +237,23 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws return false; } + /** + * To be used in conjunction with tryPrecomputeAggregationForLeaf() + * or getLeafCollector method. + */ + public LeafCollectionMode getLeafCollectorMode() { + return leafCollectorMode; + } + + /** + * To be used in conjunction with tryPrecomputeAggregationForLeaf() + * or getLeafCollector method. + */ + public enum LeafCollectionMode { + NORMAL, + FILTER_REWRITE + } + @Override public final void preCollection() throws IOException { List collectors = Arrays.asList(subAggregators); @@ -343,4 +361,5 @@ protected void checkCancelled() { throw new TaskCancelledException("The query has been cancelled"); } } + } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java new file mode 100644 index 0000000000000..acbc62cb18024 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.apache.lucene.index.DocValuesSkipper; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.search.Scorable; +import org.opensearch.common.Rounding; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + +import java.io.IOException; + +/** + * Histogram collection logic using skip list. + * + * @opensearch.internal + */ +public class HistogramSkiplistLeafCollector extends LeafBucketCollector { + + private final NumericDocValues values; + private final DocValuesSkipper skipper; + private final Rounding.Prepared preparedRounding; + private final LongKeyedBucketOrds bucketOrds; + private final LeafBucketCollector sub; + private final BucketsAggregator aggregator; + + /** + * Max doc ID (inclusive) up to which all docs values may map to the same + * bucket. + */ + private int upToInclusive = -1; + + /** + * Whether all docs up to {@link #upToInclusive} values map to the same bucket. + */ + private boolean upToSameBucket; + + /** + * Index in bucketOrds for docs up to {@link #upToInclusive}. + */ + private long upToBucketIndex; + + public HistogramSkiplistLeafCollector( + NumericDocValues values, + DocValuesSkipper skipper, + Rounding.Prepared preparedRounding, + LongKeyedBucketOrds bucketOrds, + LeafBucketCollector sub, + BucketsAggregator aggregator + ) { + this.values = values; + this.skipper = skipper; + this.preparedRounding = preparedRounding; + this.bucketOrds = bucketOrds; + this.sub = sub; + this.aggregator = aggregator; + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + if (sub != null) { + sub.setScorer(scorer); + } + } + + private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { + if (doc > skipper.maxDocID(0)) { + skipper.advance(doc); + } + upToSameBucket = false; + + if (skipper.minDocID(0) > doc) { + // Corner case which happens if `doc` doesn't have a value and is between two + // intervals of + // the doc-value skip index. + upToInclusive = skipper.minDocID(0) - 1; + return; + } + + upToInclusive = skipper.maxDocID(0); + + // Now find the highest level where all docs map to the same bucket. + for (int level = 0; level < skipper.numLevels(); ++level) { + int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; + long minBucket = preparedRounding.round(skipper.minValue(level)); + long maxBucket = preparedRounding.round(skipper.maxValue(level)); + + if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { + // All docs at this level have a value, and all values map to the same bucket. + upToInclusive = skipper.maxDocID(level); + upToSameBucket = true; + upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); + if (upToBucketIndex < 0) { + upToBucketIndex = -1 - upToBucketIndex; + } + } else { + break; + } + } + } + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (doc > upToInclusive) { + advanceSkipper(doc, owningBucketOrd); + } + + if (upToSameBucket) { + aggregator.incrementBucketDocCount(upToBucketIndex, 1L); + sub.collect(doc, upToBucketIndex); + } else if (values.advanceExact(doc)) { + final long value = values.longValue(); + long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); + if (bucketIndex < 0) { + bucketIndex = -1 - bucketIndex; + aggregator.collectExistingBucket(sub, doc, bucketIndex); + } else { + aggregator.collectBucket(sub, doc, bucketIndex); + } + } + } + + @Override + public void collect(DocIdStream stream) throws IOException { + // This will only be called if its the top agg + collect(stream, 0); + } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + // This will only be called if its the sub aggregation + for (;;) { + int upToExclusive = upToInclusive + 1; + if (upToExclusive < 0) { // overflow + upToExclusive = Integer.MAX_VALUE; + } + + if (upToSameBucket) { + if (sub == NO_OP_COLLECTOR) { + // stream.count maybe faster when we don't need to handle sub-aggs + long count = stream.count(upToExclusive); + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } else { + final int[] count = { 0 }; + stream.forEach(upToExclusive, doc -> { + sub.collect(doc, upToBucketIndex); + count[0]++; + }); + aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); + } + } else { + stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd)); + } + + if (stream.mayHaveRemaining()) { + advanceSkipper(upToExclusive, owningBucketOrd); + } else { + break; + } + } + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index cd77a17d31815..42dfc30b0da63 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -39,7 +39,6 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdStream; -import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; @@ -54,6 +53,7 @@ import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorBase; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.CardinalityUpperBound; @@ -63,6 +63,7 @@ import org.opensearch.search.aggregations.StarTreeBucketCollector; import org.opensearch.search.aggregations.StarTreePreComputeCollector; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -232,12 +233,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol final SortedNumericDocValues values = valuesSource.longValues(ctx); final NumericDocValues singleton = DocValues.unwrapSingleton(values); - if (skipper != null && singleton != null) { - // TODO: add hard bounds support - if (hardBounds == null && parent == null) { - skipListCollectorsUsed++; - return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); - } + if (canUseSkiplist(skipper, singleton)) { + skipListCollectorsUsed++; + return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } if (singleton != null) { @@ -297,6 +295,23 @@ public void collectRange(int min, int max) throws IOException { }; } + /** + * Skiplist is based as top level agg (null parent) or parent that will execute in sorted order + * + */ + private boolean canUseSkiplist(DocValuesSkipper skipper, NumericDocValues singleton) { + if (skipper == null || singleton == null) return false; + // TODO: add hard bounds support + if (hardBounds != null) return false; + + if (parent == null) return true; + + if (parent instanceof AggregatorBase base) { + return base.getLeafCollectorMode() == LeafCollectionMode.FILTER_REWRITE; + } + return false; + } + private void collectValue(LeafBucketCollector sub, int doc, long owningBucketOrd, long rounded) throws IOException { if (hardBounds == null || hardBounds.contain(rounded)) { long bucketOrd = bucketOrds.add(owningBucketOrd, rounded); @@ -454,149 +469,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) { return 1.0; } } - - private static class HistogramSkiplistLeafCollector extends LeafBucketCollector { - - private final NumericDocValues values; - private final DocValuesSkipper skipper; - private final Rounding.Prepared preparedRounding; - private final LongKeyedBucketOrds bucketOrds; - private final LeafBucketCollector sub; - private final BucketsAggregator aggregator; - - /** - * Max doc ID (inclusive) up to which all docs values may map to the same bucket. - */ - private int upToInclusive = -1; - - /** - * Whether all docs up to {@link #upToInclusive} values map to the same bucket. - */ - private boolean upToSameBucket; - - /** - * Index in bucketOrds for docs up to {@link #upToInclusive}. - */ - private long upToBucketIndex; - - HistogramSkiplistLeafCollector( - NumericDocValues values, - DocValuesSkipper skipper, - Rounding.Prepared preparedRounding, - LongKeyedBucketOrds bucketOrds, - LeafBucketCollector sub, - BucketsAggregator aggregator - ) { - this.values = values; - this.skipper = skipper; - this.preparedRounding = preparedRounding; - this.bucketOrds = bucketOrds; - this.sub = sub; - this.aggregator = aggregator; - } - - @Override - public void setScorer(Scorable scorer) throws IOException { - if (sub != null) { - sub.setScorer(scorer); - } - } - - private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { - if (doc > skipper.maxDocID(0)) { - skipper.advance(doc); - } - upToSameBucket = false; - - if (skipper.minDocID(0) > doc) { - // Corner case which happens if `doc` doesn't have a value and is between two intervals of - // the doc-value skip index. - upToInclusive = skipper.minDocID(0) - 1; - return; - } - - upToInclusive = skipper.maxDocID(0); - - // Now find the highest level where all docs map to the same bucket. - for (int level = 0; level < skipper.numLevels(); ++level) { - int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; - long minBucket = preparedRounding.round(skipper.minValue(level)); - long maxBucket = preparedRounding.round(skipper.maxValue(level)); - - if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { - // All docs at this level have a value, and all values map to the same bucket. - upToInclusive = skipper.maxDocID(level); - upToSameBucket = true; - upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); - if (upToBucketIndex < 0) { - upToBucketIndex = -1 - upToBucketIndex; - } - } else { - break; - } - } - } - - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (doc > upToInclusive) { - advanceSkipper(doc, owningBucketOrd); - } - - if (upToSameBucket) { - aggregator.incrementBucketDocCount(upToBucketIndex, 1L); - sub.collect(doc, upToBucketIndex); - } else if (values.advanceExact(doc)) { - final long value = values.longValue(); - long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); - if (bucketIndex < 0) { - bucketIndex = -1 - bucketIndex; - aggregator.collectExistingBucket(sub, doc, bucketIndex); - } else { - aggregator.collectBucket(sub, doc, bucketIndex); - } - } - } - - @Override - public void collect(int doc) throws IOException { - collect(doc, 0); - } - - @Override - public void collect(DocIdStream stream) throws IOException { - // This will only be called if its the top agg - for (;;) { - int upToExclusive = upToInclusive + 1; - if (upToExclusive < 0) { // overflow - upToExclusive = Integer.MAX_VALUE; - } - - if (upToSameBucket) { - if (sub == NO_OP_COLLECTOR) { - // stream.count maybe faster when we don't need to handle sub-aggs - long count = stream.count(upToExclusive); - aggregator.incrementBucketDocCount(upToBucketIndex, count); - } else { - final int[] count = { 0 }; - stream.forEach(upToExclusive, doc -> { - sub.collect(doc, upToBucketIndex); - count[0]++; - }); - aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); - } - - } else { - stream.forEach(upToExclusive, this::collect); - } - - if (stream.mayHaveRemaining()) { - advanceSkipper(upToExclusive, 0); - } else { - break; - } - } - } - - } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java index d4448cc3985d0..aad698a7a2ba8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java @@ -335,8 +335,13 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws return true; } - return segmentMatchAll(context, ctx) - && filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, true, collectableSubAggregators); + try { + leafCollectorMode = LeafCollectionMode.FILTER_REWRITE; + return segmentMatchAll(context, ctx) + && filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, true, collectableSubAggregators); + } finally { + leafCollectorMode = LeafCollectionMode.NORMAL; + } } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index c59fadee03633..26c540451f2be 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -12,23 +12,34 @@ import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.fielddata.IndexNumericFieldData; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParseContext; +import org.opensearch.search.MultiValueMode; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregator; @@ -270,6 +281,80 @@ public void testRangeWithCard() throws IOException { assertEquals(2, thirdCardinality.getValue(), 0); } + /** + * Test that verifies skiplist-based collection works correctly with range aggregations + * that have date histogram sub-aggregations. + * - Index sort on date field to enable skiplist functionality + * - Multiple segments created via explicit commits + * - Searchable date field type + * - Documents distributed across multiple date ranges (2015-2017) + */ + public void testRangeDate() throws IOException { + // Setup index with skiplist configuration + Settings settings = getSettingsWithIndexSort(); + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + + // Create searchable date field type (isSearchable=true) to enable skiplist + DateFieldMapper.DateFieldType searchableDateFieldType = aggregableDateFieldType(false, true); + + // Use custom index setup instead of setupIndex() method + try (Directory directory = newDirectory()) { + // Index documents in batches with commits to create multiple segments + indexDocsForSkiplist(directory, searchableDateFieldType); + + try (DirectoryReader indexReader = DirectoryReader.open(directory)) { + // Verify we have multiple segments (required for skiplist testing) + assertTrue("Should have multiple segments for skiplist testing", indexReader.leaves().size() > 1); + + // Create IndexSearcher with the reader + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + // Build RangeAggregationBuilder with DateHistogramAggregationBuilder sub-aggregation + // Use YEAR interval to align with our test data structure (2015, 2016, 2017) + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation( + new DateHistogramAggregationBuilder(dateAggName).field(dateFieldName).calendarInterval(DateHistogramInterval.YEAR) + ); + + // Execute aggregation on reader with IndexSettings (enables skiplist) + InternalRange result = executeAggregationOnReader(indexReader, rangeAggregationBuilder, indexSettings); + + // Verify results - this confirms skiplist collection worked correctly + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + // Range bucket 1: expect 5 docs, 1 date histogram bucket (2015) + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(5, firstBucket.getDocCount()); + InternalDateHistogram firstDate = firstBucket.getAggregations().get(dateAggName); + assertNotNull(firstDate); + assertEquals(1, firstDate.getBuckets().size()); + assertEquals(5, firstDate.getBuckets().get(0).getDocCount()); + + // Range bucket 2: expect 8 docs, 2 date histogram buckets (2015, 2016) + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(8, secondBucket.getDocCount()); + InternalDateHistogram secondDate = secondBucket.getAggregations().get(dateAggName); + assertNotNull(secondDate); + assertEquals(2, secondDate.getBuckets().size()); + assertEquals(5, secondDate.getBuckets().get(0).getDocCount()); + assertEquals(3, secondDate.getBuckets().get(1).getDocCount()); + + // Range bucket 3: expect 7 docs, 1 date histogram bucket (2017) + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(7, thirdBucket.getDocCount()); + InternalDateHistogram thirdDate = thirdBucket.getAggregations().get(dateAggName); + assertNotNull(thirdDate); + assertEquals(1, thirdDate.getBuckets().size()); + assertEquals(7, thirdDate.getBuckets().get(0).getDocCount()); + } + } + } + public void testDateHisto() throws IOException { DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder(dateAggName).field( dateFieldName @@ -543,13 +628,21 @@ private Directory setupIndex(List docs, boolean random) throws IOExcept private IA executeAggregationOnReader( DirectoryReader indexReader, AggregationBuilder aggregationBuilder + ) throws IOException { + return executeAggregationOnReader(indexReader, aggregationBuilder, null); + } + + private IA executeAggregationOnReader( + DirectoryReader indexReader, + AggregationBuilder aggregationBuilder, + IndexSettings indexSettings ) throws IOException { IndexSearcher indexSearcher = new IndexSearcher(indexReader); MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = createBucketConsumer(); SearchContext searchContext = createSearchContext( indexSearcher, - createIndexSettings(), + indexSettings != null ? indexSettings : createIndexSettings(), matchAllQuery, bucketConsumer, longFieldType, @@ -645,7 +738,7 @@ private static class SubAggToVerify { protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) { return new DateFieldMapper.DateFieldType( - "timestamp", + dateFieldName, isSearchable, false, true, @@ -655,4 +748,97 @@ protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean us Collections.emptyMap() ); } + + /** + * Helper method to create Settings with index sort on date field for skiplist testing + */ + private Settings getSettingsWithIndexSort() { + return Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .putList("index.sort.field", dateFieldName) + .build(); + } + + /** + * Helper method to index documents in batches with commits for skiplist structure + */ + private void indexDocsForSkiplist(Directory directory, DateFieldMapper.DateFieldType dateFieldType) throws IOException { + IndexWriterConfig config = new IndexWriterConfig(); + config.setMergePolicy(NoMergePolicy.INSTANCE); + + // Create sort field for index sort + IndexNumericFieldData fieldData = (IndexNumericFieldData) dateFieldType.fielddataBuilder("index", () -> { + throw new UnsupportedOperationException(); + }).build(null, null); + SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false); + config.setIndexSort(new Sort(sortField)); + + try (IndexWriter indexWriter = new IndexWriter(directory, config)) { + // First commit - documents for range bucket 1 (metric values 1-2) + // Dates: 2015 (5 docs with metric=1) + for (int i = 0; i < 5; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2015-02-13T13:09:32Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 1)); + doc.add(new LongPoint(longFieldName, 1)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Second commit - documents for range bucket 2 (metric values 2-4) + // Dates: 2015-2016 (5 docs with metric=2, 3 docs with metric=3) + for (int i = 0; i < 5; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2015-11-13T16:14:34Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 2)); + doc.add(new LongPoint(longFieldName, 2)); + indexWriter.addDocument(doc); + } + for (int i = 0; i < 3; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2016-03-04T17:09:50Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 3)); + doc.add(new LongPoint(longFieldName, 3)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Third commit - documents for range bucket 3 (metric values 4-6) + // Dates: 2017 (4 docs with metric=4, 3 docs with metric=5) + for (int i = 0; i < 4; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2017-12-12T22:55:46Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 4)); + doc.add(new LongPoint(longFieldName, 4)); + indexWriter.addDocument(doc); + } + for (int i = 0; i < 3; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2017-12-12T22:55:46Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 5)); + doc.add(new LongPoint(longFieldName, 5)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + } + } + + /** + * Helper method to parse date strings to long values + */ + private long asLong(String dateTime, DateFieldMapper.DateFieldType fieldType) { + return fieldType.parse(dateTime); + } }