diff --git a/CHANGELOG.md b/CHANGELOG.md index a48e9bc010fb1..5002508eb3282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157)) - Add support for missing proto fields in GRPC FunctionScore and Highlight ([#20169](https://github.com/opensearch-project/OpenSearch/pull/20169)) - Ensure all modules are included in INTEG_TEST testcluster distribution ([#20241](https://github.com/opensearch-project/OpenSearch/pull/20241)) +- Enable skiplist based collection using collectRange ([#20268](https://github.com/opensearch-project/OpenSearch/pull/20268)) ### Fixed - Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055)) diff --git a/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java b/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java index b304602965ae2..a2b05c6872566 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java +++ b/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java @@ -121,8 +121,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index a196549b41a51..30a0ac45fe11e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -131,24 +131,29 @@ public void collect(DocIdStream stream) throws IOException { collect(stream, 0); } + @Override + public void collectRange(int min, int max) throws IOException { + collectRange(min, max, 0); + } + /** - * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code - * max} is guaranteed to be greater than {@code min}. + * Collect a range of doc IDs into {@code bucket}, between {@code min} inclusive and {@code max} exclusive. + * {@code max} is guaranteed to be greater than {@code min}. * *

Extending this method is typically useful to take advantage of pre-aggregated data exposed * in a {@link DocValuesSkipper}. * - *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that - * matches the given range. + *

The default implementation calls {@link #collect(int, long)} on range of doc IDs, between + * {@code min} inclusive and {@code max} exclusive. * - * @see #collect(int,long) + * @see #collect(int, long) */ - @Override - public void collectRange(int min, int max) throws IOException { + @ExperimentalApi + public void collectRange(int min, int max, long bucket) throws IOException { // Different aggregator implementations should override this method even if to just delegate to super for // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. for (int docId = min; docId < max; docId++) { - collect(docId, 0); + collect(docId, bucket); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java index 52547a0441425..c8373dd2bf916 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -180,12 +180,6 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } - @Override - public void collect(DocIdStream stream) throws IOException { - // This will only be called if its the top agg - collect(stream, 0); - } - @Override public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { for (;;) { @@ -219,6 +213,38 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } } + @Override + public void collectRange(int min, int max, long bucket) throws IOException { + if (upToInclusive < min) { + advanceSkipper(min, bucket); + } + + for (;;) { + int upToExclusive = Integer.min(upToInclusive + 1, max); // Don't collect past max + if (upToExclusive < 0) { // overflow + upToExclusive = Integer.MAX_VALUE; + } + + if (upToSameBucket) { + aggregator.incrementBucketDocCount(upToBucketIndex, upToExclusive - min); + if (isSubNoOp == false) { + sub.collectRange(min, upToExclusive, upToBucketIndex); + } + } else { + for (int docId = min; docId < upToExclusive; docId++) { + collect(docId, bucket); + } + } + + if (upToExclusive < max) { + min = upToExclusive; + advanceSkipper(min, bucket); + } else { + break; + } + } + } + /** * Call back for auto date histogram * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index a64abfc27748a..6cdeebeeac542 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -644,8 +644,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } @@ -683,8 +683,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } @@ -752,8 +752,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregator.java index a9388d30ccc51..d28f0b8af812a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregator.java @@ -90,8 +90,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java index d5d1b68753364..7699c445fde7f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -199,8 +199,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 57134d07d5021..7869e6a6f4c7c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -454,8 +454,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } private void collectValue(int doc, long rounded) throws IOException { @@ -748,8 +748,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index eb7f167015d13..9aac043dd52cc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -255,8 +255,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } @@ -288,8 +288,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/NumericHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/NumericHistogramAggregator.java index a8b4e2fea40ff..14c836474285f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/NumericHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/NumericHistogramAggregator.java @@ -145,8 +145,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java index aad698a7a2ba8..5b661e1b4e47b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java @@ -365,8 +365,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } private int collect(int doc, double value, long owningBucketOrdinal, int lowBound) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 903155588880a..304949c00466f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -281,8 +281,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -305,8 +305,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -335,8 +335,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -362,8 +362,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -600,8 +600,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -627,8 +627,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -1243,8 +1243,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index 26bc12c98538b..35aedce4ec253 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -124,8 +124,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 8d8008e053f85..7708400c21246 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -165,8 +165,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }); } @@ -733,8 +733,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 1031a0370e57d..44fbcf0d80479 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -166,8 +166,8 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } @Override - public void collectRange(int min, int max) throws IOException { - setKahanSummation(0); + public void collectRange(int min, int max, long bucket) throws IOException { + setKahanSummation(bucket); int count = 0; for (int docId = min; docId < max; docId++) { if (values.advanceExact(docId)) { @@ -178,9 +178,9 @@ public void collectRange(int min, int max) throws IOException { } } } - counts.increment(0, count); - sums.set(0, kahanSummation.value()); - compensations.set(0, kahanSummation.delta()); + counts.increment(bucket, count); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } private void setKahanSummation(long bucket) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregator.java index d926bb1d0d273..e29f8cdec5436 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -170,8 +170,8 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException } @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collectRange(int min, int max, long bucket) throws IOException { + super.collectRange(min, max, bucket); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8a656d768cee2..cdc8fbf24f007 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -181,15 +181,15 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } @Override - public void collectRange(int min, int max) throws IOException { - growMaxes(0); - double maximum = maxes.get(0); + public void collectRange(int min, int max, long bucket) throws IOException { + growMaxes(bucket); + double maximum = maxes.get(bucket); for (int doc = min; doc < max; doc++) { if (values.advanceExact(doc)) { maximum = Math.max(maximum, values.doubleValue()); } } - maxes.set(0, maximum); + maxes.set(bucket, maximum); } private void growMaxes(long bucket) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index cb4b530b5bda2..a280c948b84ee 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -181,15 +181,15 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } @Override - public void collectRange(int min, int max) throws IOException { - growMins(0); - double minimum = mins.get(0); + public void collectRange(int min, int max, long bucket) throws IOException { + growMins(bucket); + double minimum = mins.get(bucket); for (int doc = min; doc < max; doc++) { if (values.advanceExact(doc)) { minimum = Math.min(minimum, values.doubleValue()); } } - mins.set(0, minimum); + mins.set(bucket, minimum); } private void growMins(long bucket) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java index 98fc5cc4d6d42..0a1a200dae2f1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java @@ -154,11 +154,11 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } @Override - public void collectRange(int min, int max) throws IOException { - growStats(0); + public void collectRange(int min, int max, long bucket) throws IOException { + growStats(bucket); - double minimum = mins.get(0); - double maximum = maxes.get(0); + double minimum = mins.get(bucket); + double maximum = maxes.get(bucket); for (int doc = min; doc < maximum; doc++) { if (values.advanceExact(doc)) { final int valuesCount = values.docValueCount(); @@ -172,10 +172,10 @@ public void collectRange(int min, int max) throws IOException { } } } - sums.set(0, kahanSummation.value()); - compensations.set(0, kahanSummation.delta()); - mins.set(0, minimum); - maxes.set(0, maximum); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + mins.set(bucket, minimum); + maxes.set(bucket, maximum); } private void growStats(long bucket) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 29228afb8ce8e..c62d6c93fbcce 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -149,8 +149,8 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } @Override - public void collectRange(int min, int max) throws IOException { - setKahanSummation(0); + public void collectRange(int min, int max, long bucket) throws IOException { + setKahanSummation(bucket); for (int docId = min; docId < max; docId++) { if (values.advanceExact(docId)) { for (int i = 0; i < values.docValueCount(); i++) { @@ -158,8 +158,8 @@ public void collectRange(int min, int max) throws IOException { } } } - sums.set(0, kahanSummation.value()); - compensations.set(0, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } private void setKahanSummation(long bucket) {