From 839c94a4765afc363e57b77707ee3953f7a0bc95 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Thu, 20 Nov 2025 18:32:22 -0800 Subject: [PATCH 01/17] Specific implementation for CardinalityAggregator Signed-off-by: Ankit Jain --- .../aggregations/LeafBucketCollector.java | 21 ----------- .../metrics/CardinalityAggregator.java | 35 ++++++++++++------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index a196549b41a51..8ecf949cd31ad 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -131,27 +131,6 @@ public void collect(DocIdStream stream) throws IOException { collect(stream, 0); } - /** - * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code - * max} is guaranteed to be greater than {@code min}. - * - *

Extending this method is typically useful to take advantage of pre-aggregated data exposed - * in a {@link DocValuesSkipper}. - * - *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that - * matches the given range. - * - * @see #collect(int,long) - */ - @Override - public void collectRange(int min, int max) throws IOException { - // Different aggregator implementations should override this method even if to just delegate to super for - // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. - for (int docId = min; docId < max; docId++) { - collect(docId, 0); - } - } - /** * Bulk-collect doc IDs within {@code owningBucketOrd}. * diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 8cb21c5b6effb..8de6b12ac3ff4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -579,12 +579,31 @@ public static long memoryOverhead(long maxOrd) { @Override public void collect(int doc, long bucketOrd) throws IOException { - visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1); - BitArray bits = visitedOrds.get(bucketOrd); + collect(doc, getBitArray(bucketOrd)); + } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + visitedOrds = bigArrays.grow(visitedOrds, owningBucketOrd + 1); + BitArray bits = visitedOrds.get(owningBucketOrd); + if (bits == null) { + bits = new BitArray(maxOrd, bigArrays); + visitedOrds.set(owningBucketOrd, bits); + } + stream.forEach((doc) -> collect(doc, getBitArray(owningBucketOrd))); + } + + private BitArray getBitArray(long bucket) { + visitedOrds = bigArrays.grow(visitedOrds, bucket + 1); + BitArray bits = visitedOrds.get(bucket); if (bits == null) { bits = new BitArray(maxOrd, bigArrays); - visitedOrds.set(bucketOrd, bits); + visitedOrds.set(bucket, bits); } + return bits; + } + + private void collect(final int doc, final BitArray bits) throws IOException { if (values.advanceExact(doc)) { int count = values.docValueCount(); long ord; @@ -594,16 +613,6 @@ public void collect(int doc, long bucketOrd) throws IOException { } } - @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); - } - @Override public void postCollect() throws IOException { try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) { From 34e21916ca08dd666ce581dd79cb7892e28ddffa Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Thu, 20 Nov 2025 22:14:43 -0800 Subject: [PATCH 02/17] Applying spotless Signed-off-by: Ankit Jain --- .../aggregations/LeafBucketCollector.java | 313 ++++++++++-------- .../metrics/CardinalityAggregator.java | 15 +- 2 files changed, 176 insertions(+), 152 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index 8ecf949cd31ad..ab9c87b410cff 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -1,158 +1,179 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.search.aggregations; - -import org.apache.lucene.index.DocValuesSkipper; -import org.apache.lucene.search.DocIdStream; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.Scorable; -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; - -import java.io.IOException; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -/** - * Per-leaf bucket collector. - * - * @opensearch.internal - */ -public abstract class LeafBucketCollector implements LeafCollector { - - public static final LeafBucketCollector NO_OP_COLLECTOR = new LeafBucketCollector() { - @Override - public void setScorer(Scorable arg0) throws IOException { - // no-op - } + /* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ - @Override - public void collect(int doc, long bucket) { - // no-op - } - }; - - public static LeafBucketCollector wrap(Iterable collectors) { - final Stream actualCollectors = StreamSupport.stream(collectors.spliterator(), false) - .filter(c -> c != NO_OP_COLLECTOR); - final LeafBucketCollector[] colls = actualCollectors.toArray(size -> new LeafBucketCollector[size]); - switch (colls.length) { - case 0: - return NO_OP_COLLECTOR; - case 1: - return colls[0]; - default: - return new LeafBucketCollector() { - - @Override - public void setScorer(Scorable s) throws IOException { - for (LeafBucketCollector c : colls) { - c.setScorer(s); + /* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + /* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + + package org.opensearch.search.aggregations; + + import org.apache.lucene.index.DocValuesSkipper; + import org.apache.lucene.search.DocIdStream; + import org.apache.lucene.search.LeafCollector; + import org.apache.lucene.search.Scorable; + import org.opensearch.common.annotation.ExperimentalApi; + import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + + import java.io.IOException; + import java.util.stream.Stream; + import java.util.stream.StreamSupport; + + /** + * Per-leaf bucket collector. + * + * @opensearch.internal + */ + public abstract class LeafBucketCollector implements LeafCollector { + + public static final LeafBucketCollector NO_OP_COLLECTOR = new LeafBucketCollector() { + @Override + public void setScorer(Scorable arg0) throws IOException { + // no-op + } + + @Override + public void collect(int doc, long bucket) { + // no-op + } + }; + + public static LeafBucketCollector wrap(Iterable collectors) { + final Stream actualCollectors = StreamSupport.stream(collectors.spliterator(), false) + .filter(c -> c != NO_OP_COLLECTOR); + final LeafBucketCollector[] colls = actualCollectors.toArray(size -> new LeafBucketCollector[size]); + switch (colls.length) { + case 0: + return NO_OP_COLLECTOR; + case 1: + return colls[0]; + default: + return new LeafBucketCollector() { + + @Override + public void setScorer(Scorable s) throws IOException { + for (LeafBucketCollector c : colls) { + c.setScorer(s); + } } - } - @Override - public void collect(int doc, long bucket) throws IOException { - for (LeafBucketCollector c : colls) { - c.collect(doc, bucket); + @Override + public void collect(int doc, long bucket) throws IOException { + for (LeafBucketCollector c : colls) { + c.collect(doc, bucket); + } } - } - }; + }; + } } - } - /** - * Collect the given {@code doc} in the bucket owned by - * {@code owningBucketOrd}. - *

- * The implementation of this method metric aggregations is generally - * something along the lines of - *

{@code
-     * array[owningBucketOrd] += loadValueFromDoc(doc)
-     * }
- *

Bucket aggregations have more trouble because their job is to - * make new ordinals. So their implementation generally - * looks kind of like - *

{@code
-     * long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
-     * collectBucket(doc, myBucketOrd);
-     * }
- *

- * Some bucket aggregations "know" how many ordinals each owning ordinal - * needs so they can map "densely". The {@code range} aggregation, for - * example, can perform this mapping with something like: - *

{@code
-     * return rangeCount * owningBucketOrd + matchingRange(value);
-     * }
- * Other aggregations don't know how many buckets will fall into any - * particular owning bucket. The {@code terms} aggregation, for example, - * uses {@link LongKeyedBucketOrds} which amounts to a hash lookup. - */ - public abstract void collect(int doc, long owningBucketOrd) throws IOException; + /** + * Collect the given {@code doc} in the bucket owned by + * {@code owningBucketOrd}. + *

+ * The implementation of this method metric aggregations is generally + * something along the lines of + *

{@code
+         * array[owningBucketOrd] += loadValueFromDoc(doc)
+         * }
+ *

Bucket aggregations have more trouble because their job is to + * make new ordinals. So their implementation generally + * looks kind of like + *

{@code
+         * long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
+         * collectBucket(doc, myBucketOrd);
+         * }
+ *

+ * Some bucket aggregations "know" how many ordinals each owning ordinal + * needs so they can map "densely". The {@code range} aggregation, for + * example, can perform this mapping with something like: + *

{@code
+         * return rangeCount * owningBucketOrd + matchingRange(value);
+         * }
+ * Other aggregations don't know how many buckets will fall into any + * particular owning bucket. The {@code terms} aggregation, for example, + * uses {@link LongKeyedBucketOrds} which amounts to a hash lookup. + */ + public abstract void collect(int doc, long owningBucketOrd) throws IOException; - @Override - public void collect(int doc) throws IOException { - collect(doc, 0); - } + @Override + public void collect(int doc) throws IOException { + collect(doc, 0); + } - @Override - public void collect(DocIdStream stream) throws IOException { - collect(stream, 0); - } + @Override + public void collect(DocIdStream stream) throws IOException { + collect(stream, 0); + } - /** - * Bulk-collect doc IDs within {@code owningBucketOrd}. - * - *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed immediately. - * - *

Note: The provided DocIdStream typically only holds a small subset of query matches. This method may be called multiple times per segment. - * Like collect(int), it is guaranteed that doc IDs get collected in order, ie. doc IDs are collected in order within a DocIdStream, and if - * called twice, all doc IDs from the second DocIdStream will be greater than all doc IDs from the first DocIdStream. - * - *

It is legal for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link #collect(int, long)}. - * - *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. - */ - @ExperimentalApi - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - // Different aggregator implementations should override this method even if to just delegate to super for - // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. - stream.forEach((doc) -> collect(doc, owningBucketOrd)); - } + /** + * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code + * max} is guaranteed to be greater than {@code min}. + * + *

Extending this method is typically useful to take advantage of pre-aggregated data exposed + * in a {@link DocValuesSkipper}. + * + *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that + * matches the given range. + * + * @see #collect(int, long) + */ + @Override + public void collectRange(int min, int max) throws IOException { + // Different aggregator implementations should override this method even if to just delegate to super for + // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. + for (int docId = min; docId < max; docId++) { + collect(docId, 0); + } + } + + /** + * Bulk-collect doc IDs within {@code owningBucketOrd}. + * + *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed immediately. + * + *

Note: The provided DocIdStream typically only holds a small subset of query matches. This method may be called multiple times per segment. + * Like collect(int), it is guaranteed that doc IDs get collected in order, ie. doc IDs are collected in order within a DocIdStream, and if + * called twice, all doc IDs from the second DocIdStream will be greater than all doc IDs from the first DocIdStream. + * + *

It is legal for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link #collect(int, long)}. + * + *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. + */ + @ExperimentalApi + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + // Different aggregator implementations should override this method even if to just delegate to super for + // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. + stream.forEach((doc) -> collect(doc, owningBucketOrd)); + } - @Override - public void setScorer(Scorable scorer) throws IOException { - // no-op by default + @Override + public void setScorer(Scorable scorer) throws IOException { + // no-op by default + } } -} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 8de6b12ac3ff4..5e1617001c6ac 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -584,13 +584,16 @@ public void collect(int doc, long bucketOrd) throws IOException { @Override public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - visitedOrds = bigArrays.grow(visitedOrds, owningBucketOrd + 1); - BitArray bits = visitedOrds.get(owningBucketOrd); - if (bits == null) { - bits = new BitArray(maxOrd, bigArrays); - visitedOrds.set(owningBucketOrd, bits); + final BitArray bits = getBitArray(owningBucketOrd); + stream.forEach((doc) -> collect(doc, bits)); + } + + @Override + public void collectRange(int minDoc, int maxDoc) throws IOException { + final BitArray bits = getBitArray(0); + for (int doc = minDoc; doc < maxDoc; ++doc) { + collect(doc, bits); } - stream.forEach((doc) -> collect(doc, getBitArray(owningBucketOrd))); } private BitArray getBitArray(long bucket) { From a03cf47f2469983012fa1b3d782ee6517592b42a Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Thu, 20 Nov 2025 22:16:14 -0800 Subject: [PATCH 03/17] Applying spotless Signed-off-by: Ankit Jain --- .../aggregations/LeafBucketCollector.java | 330 +++++++++--------- 1 file changed, 165 insertions(+), 165 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index ab9c87b410cff..a40d595d189d3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -1,179 +1,179 @@ - /* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - - /* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - /* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - - package org.opensearch.search.aggregations; - - import org.apache.lucene.index.DocValuesSkipper; - import org.apache.lucene.search.DocIdStream; - import org.apache.lucene.search.LeafCollector; - import org.apache.lucene.search.Scorable; - import org.opensearch.common.annotation.ExperimentalApi; - import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; - - import java.io.IOException; - import java.util.stream.Stream; - import java.util.stream.StreamSupport; +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.search.aggregations; + +import org.apache.lucene.index.DocValuesSkipper; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorable; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + +import java.io.IOException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Per-leaf bucket collector. + * + * @opensearch.internal + */ +public abstract class LeafBucketCollector implements LeafCollector { + + public static final LeafBucketCollector NO_OP_COLLECTOR = new LeafBucketCollector() { + @Override + public void setScorer(Scorable arg0) throws IOException { + // no-op + } - /** - * Per-leaf bucket collector. - * - * @opensearch.internal - */ - public abstract class LeafBucketCollector implements LeafCollector { - - public static final LeafBucketCollector NO_OP_COLLECTOR = new LeafBucketCollector() { - @Override - public void setScorer(Scorable arg0) throws IOException { - // no-op - } - - @Override - public void collect(int doc, long bucket) { - // no-op - } - }; - - public static LeafBucketCollector wrap(Iterable collectors) { - final Stream actualCollectors = StreamSupport.stream(collectors.spliterator(), false) - .filter(c -> c != NO_OP_COLLECTOR); - final LeafBucketCollector[] colls = actualCollectors.toArray(size -> new LeafBucketCollector[size]); - switch (colls.length) { - case 0: - return NO_OP_COLLECTOR; - case 1: - return colls[0]; - default: - return new LeafBucketCollector() { - - @Override - public void setScorer(Scorable s) throws IOException { - for (LeafBucketCollector c : colls) { - c.setScorer(s); - } + @Override + public void collect(int doc, long bucket) { + // no-op + } + }; + + public static LeafBucketCollector wrap(Iterable collectors) { + final Stream actualCollectors = StreamSupport.stream(collectors.spliterator(), false) + .filter(c -> c != NO_OP_COLLECTOR); + final LeafBucketCollector[] colls = actualCollectors.toArray(size -> new LeafBucketCollector[size]); + switch (colls.length) { + case 0: + return NO_OP_COLLECTOR; + case 1: + return colls[0]; + default: + return new LeafBucketCollector() { + + @Override + public void setScorer(Scorable s) throws IOException { + for (LeafBucketCollector c : colls) { + c.setScorer(s); } + } - @Override - public void collect(int doc, long bucket) throws IOException { - for (LeafBucketCollector c : colls) { - c.collect(doc, bucket); - } + @Override + public void collect(int doc, long bucket) throws IOException { + for (LeafBucketCollector c : colls) { + c.collect(doc, bucket); } + } - }; - } + }; } + } - /** - * Collect the given {@code doc} in the bucket owned by - * {@code owningBucketOrd}. - *

- * The implementation of this method metric aggregations is generally - * something along the lines of - *

{@code
-         * array[owningBucketOrd] += loadValueFromDoc(doc)
-         * }
- *

Bucket aggregations have more trouble because their job is to - * make new ordinals. So their implementation generally - * looks kind of like - *

{@code
-         * long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
-         * collectBucket(doc, myBucketOrd);
-         * }
- *

- * Some bucket aggregations "know" how many ordinals each owning ordinal - * needs so they can map "densely". The {@code range} aggregation, for - * example, can perform this mapping with something like: - *

{@code
-         * return rangeCount * owningBucketOrd + matchingRange(value);
-         * }
- * Other aggregations don't know how many buckets will fall into any - * particular owning bucket. The {@code terms} aggregation, for example, - * uses {@link LongKeyedBucketOrds} which amounts to a hash lookup. - */ - public abstract void collect(int doc, long owningBucketOrd) throws IOException; + /** + * Collect the given {@code doc} in the bucket owned by + * {@code owningBucketOrd}. + *

+ * The implementation of this method metric aggregations is generally + * something along the lines of + *

{@code
+     * array[owningBucketOrd] += loadValueFromDoc(doc)
+     * }
+ *

Bucket aggregations have more trouble because their job is to + * make new ordinals. So their implementation generally + * looks kind of like + *

{@code
+     * long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
+     * collectBucket(doc, myBucketOrd);
+     * }
+ *

+ * Some bucket aggregations "know" how many ordinals each owning ordinal + * needs so they can map "densely". The {@code range} aggregation, for + * example, can perform this mapping with something like: + *

{@code
+     * return rangeCount * owningBucketOrd + matchingRange(value);
+     * }
+ * Other aggregations don't know how many buckets will fall into any + * particular owning bucket. The {@code terms} aggregation, for example, + * uses {@link LongKeyedBucketOrds} which amounts to a hash lookup. + */ + public abstract void collect(int doc, long owningBucketOrd) throws IOException; - @Override - public void collect(int doc) throws IOException { - collect(doc, 0); - } + @Override + public void collect(int doc) throws IOException { + collect(doc, 0); + } - @Override - public void collect(DocIdStream stream) throws IOException { - collect(stream, 0); - } + @Override + public void collect(DocIdStream stream) throws IOException { + collect(stream, 0); + } - /** - * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code - * max} is guaranteed to be greater than {@code min}. - * - *

Extending this method is typically useful to take advantage of pre-aggregated data exposed - * in a {@link DocValuesSkipper}. - * - *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that - * matches the given range. - * - * @see #collect(int, long) - */ - @Override - public void collectRange(int min, int max) throws IOException { - // Different aggregator implementations should override this method even if to just delegate to super for - // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. - for (int docId = min; docId < max; docId++) { - collect(docId, 0); - } + /** + * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code + * max} is guaranteed to be greater than {@code min}. + * + *

Extending this method is typically useful to take advantage of pre-aggregated data exposed + * in a {@link DocValuesSkipper}. + * + *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that + * matches the given range. + * + * @see #collect(int, long) + */ + @Override + public void collectRange(int min, int max) throws IOException { + // Different aggregator implementations should override this method even if to just delegate to super for + // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. + for (int docId = min; docId < max; docId++) { + collect(docId, 0); } + } - /** - * Bulk-collect doc IDs within {@code owningBucketOrd}. - * - *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed immediately. - * - *

Note: The provided DocIdStream typically only holds a small subset of query matches. This method may be called multiple times per segment. - * Like collect(int), it is guaranteed that doc IDs get collected in order, ie. doc IDs are collected in order within a DocIdStream, and if - * called twice, all doc IDs from the second DocIdStream will be greater than all doc IDs from the first DocIdStream. - * - *

It is legal for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link #collect(int, long)}. - * - *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. - */ - @ExperimentalApi - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - // Different aggregator implementations should override this method even if to just delegate to super for - // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. - stream.forEach((doc) -> collect(doc, owningBucketOrd)); - } + /** + * Bulk-collect doc IDs within {@code owningBucketOrd}. + * + *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed immediately. + * + *

Note: The provided DocIdStream typically only holds a small subset of query matches. This method may be called multiple times per segment. + * Like collect(int), it is guaranteed that doc IDs get collected in order, ie. doc IDs are collected in order within a DocIdStream, and if + * called twice, all doc IDs from the second DocIdStream will be greater than all doc IDs from the first DocIdStream. + * + *

It is legal for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link #collect(int, long)}. + * + *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. + */ + @ExperimentalApi + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + // Different aggregator implementations should override this method even if to just delegate to super for + // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. + stream.forEach((doc) -> collect(doc, owningBucketOrd)); + } - @Override - public void setScorer(Scorable scorer) throws IOException { - // no-op by default - } + @Override + public void setScorer(Scorable scorer) throws IOException { + // no-op by default } +} From b792eeb7100e1af149d28848515279c3d4a8f86d Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Fri, 21 Nov 2025 00:29:28 -0800 Subject: [PATCH 04/17] Adding changelog entry Signed-off-by: Ankit Jain --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0daf1a4d5f1c..6145721204388 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) - Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933)) - Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009)) +- Bulk collection logic for CardinalityAggregator ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) From e724df434a7f6f876049255a36af11dd6f93278e Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Fri, 21 Nov 2025 00:30:09 -0800 Subject: [PATCH 05/17] typo Signed-off-by: Ankit Jain --- .../org/opensearch/search/aggregations/LeafBucketCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index a40d595d189d3..a196549b41a51 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -141,7 +141,7 @@ public void collect(DocIdStream stream) throws IOException { *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that * matches the given range. * - * @see #collect(int, long) + * @see #collect(int,long) */ @Override public void collectRange(int min, int max) throws IOException { From 19974be20d67a3920aac4267bcb5b1688f8624d3 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sat, 22 Nov 2025 01:18:21 -0800 Subject: [PATCH 06/17] Metrics aggregation changes Signed-off-by: Ankit Jain --- .../index/fielddata/NumericDoubleValues.java | 1 + .../aggregations/LeafBucketCollector.java | 21 ------------ .../aggregations/metrics/AvgAggregator.java | 34 +++++++++++++++---- .../aggregations/metrics/CompensatedSum.java | 20 +++++++++++ .../aggregations/metrics/MaxAggregator.java | 23 ++++++++----- .../aggregations/metrics/MinAggregator.java | 20 +++++++---- .../aggregations/metrics/SumAggregator.java | 32 +++++++++++++---- 7 files changed, 101 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java b/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java index f69cfacaf35d4..65c31ea0035d8 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java +++ b/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java @@ -33,6 +33,7 @@ package org.opensearch.index.fielddata; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.DoubleValues; import org.opensearch.common.annotation.PublicApi; diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index a196549b41a51..8ecf949cd31ad 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -131,27 +131,6 @@ public void collect(DocIdStream stream) throws IOException { collect(stream, 0); } - /** - * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code - * max} is guaranteed to be greater than {@code min}. - * - *

Extending this method is typically useful to take advantage of pre-aggregated data exposed - * in a {@link DocValuesSkipper}. - * - *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that - * matches the given range. - * - * @see #collect(int,long) - */ - @Override - public void collectRange(int min, int max) throws IOException { - // Different aggregator implementations should override this method even if to just delegate to super for - // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. - for (int docId = min; docId < max; docId++) { - collect(docId, 0); - } - } - /** * Bulk-collect doc IDs within {@code owningBucketOrd}. * diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 32cf8953c1542..9bcfe4c551b44 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -132,6 +132,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { + final double[] valueBuffer = new double[64]; @Override public void collect(int doc, long bucket) throws IOException { counts = bigArrays.grow(counts, bucket + 1); @@ -159,13 +160,34 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } + public void collect(DocIdStream stream, long bucket) throws IOException { + counts = bigArrays.grow(counts, bucket + 1); + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + final int[] count = {0,0}; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + for (int i = 0; i < valuesCount; i++) { + if (count[0] == valueBuffer.length) { + kahanSummation.add(valueBuffer, count[0]); + count[0] = 0; + count[1]++; + } + valueBuffer[count[0]++] = values.nextValue(); + } + } + }); + kahanSummation.add(valueBuffer, count[0]); + counts.increment(bucket, (count[1] * valueBuffer.length) + count[0]); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java index 4d6d6d880da2e..53346b505e9c2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java @@ -112,6 +112,26 @@ public CompensatedSum add(double value, double delta) { return this; } + /** + * Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors. + */ + public void add(double[] values, int count) { + // If the value is Inf or NaN, just add it to the running tally to "convert" to + // Inf/NaN. This keeps the behavior bwc from before kahan summing + double sum = value; + double c = delta; // Compensation for lost low-order bits + + for (int i=0 ; i= maxes.size()) { @@ -174,13 +173,19 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collect(DocIdStream stream, long bucket) throws IOException { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } + double[] max = {maxes.get(bucket)}; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + max[0] = Math.max(max[0], values.doubleValue()); + } + }); + maxes.set(bucket, max[0]); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index eb3bc0bd4ee34..9daf515a7c449 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -174,13 +174,19 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collect(DocIdStream stream, long bucket) throws IOException { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } + double[] min = {mins.get(bucket)}; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + min[0] = Math.min(min[0], values.doubleValue()); + } + }); + mins.set(bucket, min[0]); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index bf450388a14da..f8a01050d10dc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -121,6 +121,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { + final double[] valueBuffer = new double[64]; @Override public void collect(int doc, long bucket) throws IOException { sums = bigArrays.grow(sums, bucket + 1); @@ -145,13 +146,30 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + public void collect(DocIdStream stream, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + final int[] count = {0}; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + for (int i = 0; i < valuesCount; i++) { + if (count[0] == valueBuffer.length) { + kahanSummation.add(valueBuffer, count[0]); + count[0] = 0; + } + valueBuffer[count[0]++] = values.nextValue(); + } + } + }); + kahanSummation.add(valueBuffer, count[0]); + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); } }; } From 922962ba8d0f7b821fa2f2f89214db96a70a26b2 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 22:43:05 -0800 Subject: [PATCH 07/17] Updating logic to use batches of 512 Signed-off-by: Ankit Jain --- .../aggregations/metrics/AvgAggregator.java | 42 +++++++------------ .../aggregations/metrics/MaxAggregator.java | 29 ++++++++----- .../aggregations/metrics/MinAggregator.java | 33 +++++++++------ .../aggregations/metrics/SumAggregator.java | 34 +++++++-------- 4 files changed, 68 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 9bcfe4c551b44..197e925a7bb26 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -132,28 +132,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { - final double[] valueBuffer = new double[64]; + final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { - counts = bigArrays.grow(counts, bucket + 1); - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valueCount = values.docValueCount(); - counts.increment(bucket, valueCount); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valueCount; i++) { + setKahanSummation(bucket); + for (int i = 0; i < values.docValueCount(); i++) { double value = values.nextValue(); kahanSummation.add(value); } - sums.set(bucket, kahanSummation.value()); compensations.set(bucket, kahanSummation.delta()); } @@ -161,15 +148,7 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { - counts = bigArrays.grow(counts, bucket + 1); - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); + setKahanSummation(bucket); final int[] count = {0,0}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { @@ -185,10 +164,21 @@ public void collect(DocIdStream stream, long bucket) throws IOException { } }); kahanSummation.add(valueBuffer, count[0]); - counts.increment(bucket, (count[1] * valueBuffer.length) + count[0]); + counts.increment(bucket, ((long) count[1] * valueBuffer.length) + count[0]); sums.set(bucket, kahanSummation.value()); compensations.set(bucket, kahanSummation.delta()); } + + private void setKahanSummation(long bucket) { + counts = bigArrays.grow(counts, bucket + 1); + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index c055b1764c287..944c9b0e41c20 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -157,13 +157,10 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); return new LeafBucketCollectorBase(sub, values) { + final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= maxes.size()) { - long from = maxes.size(); - maxes = bigArrays.grow(maxes, bucket + 1); - maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); - } + growMaxes(bucket); if (values.advanceExact(doc)) { final double value = values.doubleValue(); double max = maxes.get(bucket); @@ -174,19 +171,29 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { - if (bucket >= maxes.size()) { - long from = maxes.size(); - maxes = bigArrays.grow(maxes, bucket + 1); - maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); - } + growMaxes(bucket); + int[] count = {0}; double[] max = {maxes.get(bucket)}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - max[0] = Math.max(max[0], values.doubleValue()); + if (count[0] == valueBuffer.length) { + max[0] = Math.max(max[0], Arrays.stream(valueBuffer).min().orElse(Double.NEGATIVE_INFINITY)); + count[0] = 0; + } + valueBuffer[count[0]++] = values.doubleValue(); } }); + max[0] = Math.min(max[0], Arrays.stream(valueBuffer, 0, count[0]).min().orElse(Double.POSITIVE_INFINITY)); maxes.set(bucket, max[0]); } + + private void growMaxes(long bucket) { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 9daf515a7c449..6771e3a6c54ce 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -62,6 +62,7 @@ import org.opensearch.search.streaming.StreamingCostMetrics; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -157,14 +158,10 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { - + final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= mins.size()) { - long from = mins.size(); - mins = bigArrays.grow(mins, bucket + 1); - mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); - } + growMins(bucket); if (values.advanceExact(doc)) { final double value = values.doubleValue(); double min = mins.get(bucket); @@ -175,19 +172,29 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { - if (bucket >= mins.size()) { - long from = mins.size(); - mins = bigArrays.grow(mins, bucket + 1); - mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); - } - double[] min = {mins.get(bucket)}; + growMins(bucket); + int[] count = {0}; + double[] min = { mins.get(bucket)}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - min[0] = Math.min(min[0], values.doubleValue()); + if (count[0] == valueBuffer.length) { + min[0] = Math.min(min[0], Arrays.stream(valueBuffer).min().orElse(Double.POSITIVE_INFINITY)); + count[0] = 0; + } + valueBuffer[count[0]++] = values.doubleValue(); } }); + min[0] = Math.min(min[0], Arrays.stream(valueBuffer, 0, count[0]).min().orElse(Double.POSITIVE_INFINITY)); mins.set(bucket, min[0]); } + + private void growMins(long bucket) { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index f8a01050d10dc..8207fd3fb3c35 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -121,25 +121,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { - final double[] valueBuffer = new double[64]; + final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valuesCount; i++) { + setKahanSummation(bucket); + for (int i = 0; i < values.docValueCount(); i++) { double value = values.nextValue(); kahanSummation.add(value); } - compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } @@ -147,13 +137,7 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); + setKahanSummation(bucket); final int[] count = {0}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { @@ -171,6 +155,16 @@ public void collect(DocIdStream stream, long bucket) throws IOException { compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } + + private void setKahanSummation(long bucket) { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + } }; } From ca723b8fc96ed69f02c39f23a8b635fab89b6cf3 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 22:49:04 -0800 Subject: [PATCH 08/17] Adding back collectRange Signed-off-by: Ankit Jain --- .../aggregations/LeafBucketCollector.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index 8ecf949cd31ad..a196549b41a51 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -131,6 +131,27 @@ public void collect(DocIdStream stream) throws IOException { collect(stream, 0); } + /** + * Collect a range of doc IDs, between {@code min} inclusive and {@code max} exclusive. {@code + * max} is guaranteed to be greater than {@code min}. + * + *

Extending this method is typically useful to take advantage of pre-aggregated data exposed + * in a {@link DocValuesSkipper}. + * + *

The default implementation calls {@link #collect(DocIdStream)} on a {@link DocIdStream} that + * matches the given range. + * + * @see #collect(int,long) + */ + @Override + public void collectRange(int min, int max) throws IOException { + // Different aggregator implementations should override this method even if to just delegate to super for + // helping the performance: when the super call inlines, calls to #collect(int, long) become monomorphic. + for (int docId = min; docId < max; docId++) { + collect(docId, 0); + } + } + /** * Bulk-collect doc IDs within {@code owningBucketOrd}. * From 3c9ab49bee2e8b2f00fa11d6b7284b09eb62c896 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 22:57:29 -0800 Subject: [PATCH 09/17] Removing docvalue buffer Signed-off-by: Ankit Jain --- .../aggregations/metrics/AvgAggregator.java | 36 ++++++++++++------- .../aggregations/metrics/MaxAggregator.java | 23 +++++++----- .../aggregations/metrics/MinAggregator.java | 23 +++++++----- .../aggregations/metrics/SumAggregator.java | 26 ++++++++------ 4 files changed, 68 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 197e925a7bb26..dcb3d1a224a4d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -132,7 +132,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { - final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { @@ -149,26 +148,39 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { setKahanSummation(bucket); - final int[] count = {0,0}; + final int[] count = {0}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - for (int i = 0; i < valuesCount; i++) { - if (count[0] == valueBuffer.length) { - kahanSummation.add(valueBuffer, count[0]); - count[0] = 0; - count[1]++; - } - valueBuffer[count[0]++] = values.nextValue(); + int valueCount = values.docValueCount(); + count[0] += valueCount; + for (int i = 0; i < valueCount; i++) { + kahanSummation.add(values.nextValue()); } } }); - kahanSummation.add(valueBuffer, count[0]); - counts.increment(bucket, ((long) count[1] * valueBuffer.length) + count[0]); + counts.increment(bucket, count[0]); sums.set(bucket, kahanSummation.value()); compensations.set(bucket, kahanSummation.delta()); } + @Override + public void collectRange(int min, int max) throws IOException { + setKahanSummation(0); + int count = 0; + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + int valueCount = values.docValueCount(); + count += valueCount; + for (int i = 0; i < valueCount; i++) { + kahanSummation.add(values.nextValue()); + } + } + } + counts.increment(0, count); + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + } + private void setKahanSummation(long bucket) { counts = bigArrays.grow(counts, bucket + 1); sums = bigArrays.grow(sums, bucket + 1); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 944c9b0e41c20..741152f8a8816 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -157,7 +157,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); return new LeafBucketCollectorBase(sub, values) { - final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { growMaxes(bucket); @@ -172,21 +171,27 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { growMaxes(bucket); - int[] count = {0}; - double[] max = {maxes.get(bucket)}; + final double[] max = {maxes.get(bucket)}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - if (count[0] == valueBuffer.length) { - max[0] = Math.max(max[0], Arrays.stream(valueBuffer).min().orElse(Double.NEGATIVE_INFINITY)); - count[0] = 0; - } - valueBuffer[count[0]++] = values.doubleValue(); + max[0] = Math.max(max[0], values.doubleValue()); } }); - max[0] = Math.min(max[0], Arrays.stream(valueBuffer, 0, count[0]).min().orElse(Double.POSITIVE_INFINITY)); maxes.set(bucket, max[0]); } + @Override + public void collectRange(int min, int max) throws IOException { + growMaxes(0); + double maximum = maxes.get(0); + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + maximum = Math.max(maximum, values.doubleValue()); + } + } + maxes.set(0, maximum); + } + private void growMaxes(long bucket) { if (bucket >= maxes.size()) { long from = maxes.size(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 6771e3a6c54ce..42bb43cc61c99 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -158,7 +158,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { - final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { growMins(bucket); @@ -173,21 +172,27 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { growMins(bucket); - int[] count = {0}; - double[] min = { mins.get(bucket)}; + final double[] min = {mins.get(bucket)}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - if (count[0] == valueBuffer.length) { - min[0] = Math.min(min[0], Arrays.stream(valueBuffer).min().orElse(Double.POSITIVE_INFINITY)); - count[0] = 0; - } - valueBuffer[count[0]++] = values.doubleValue(); + min[0] = Math.max(min[0], values.doubleValue()); } }); - min[0] = Math.min(min[0], Arrays.stream(valueBuffer, 0, count[0]).min().orElse(Double.POSITIVE_INFINITY)); mins.set(bucket, min[0]); } + @Override + public void collectRange(int min, int max) throws IOException { + growMins(0); + double minimum = mins.get(0); + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + minimum = Math.max(minimum, values.doubleValue()); + } + } + mins.set(0, minimum); + } + private void growMins(long bucket) { if (bucket >= mins.size()) { long from = mins.size(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 8207fd3fb3c35..29228afb8ce8e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -121,7 +121,6 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { - final double[] valueBuffer = new double[512]; @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { @@ -138,24 +137,31 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { setKahanSummation(bucket); - final int[] count = {0}; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - for (int i = 0; i < valuesCount; i++) { - if (count[0] == valueBuffer.length) { - kahanSummation.add(valueBuffer, count[0]); - count[0] = 0; - } - valueBuffer[count[0]++] = values.nextValue(); + for (int i = 0; i < values.docValueCount(); i++) { + kahanSummation.add(values.nextValue()); } } }); - kahanSummation.add(valueBuffer, count[0]); compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } + @Override + public void collectRange(int min, int max) throws IOException { + setKahanSummation(0); + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + for (int i = 0; i < values.docValueCount(); i++) { + kahanSummation.add(values.nextValue()); + } + } + } + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + } + private void setKahanSummation(long bucket) { sums = bigArrays.grow(sums, bucket + 1); compensations = bigArrays.grow(compensations, bucket + 1); From b82da3815ab2920ea3898d8e114edcaf6b7c2b2b Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 22:58:25 -0800 Subject: [PATCH 10/17] Applying spotless Signed-off-by: Ankit Jain --- .../org/opensearch/index/fielddata/NumericDoubleValues.java | 1 - .../opensearch/search/aggregations/metrics/AvgAggregator.java | 2 +- .../opensearch/search/aggregations/metrics/CompensatedSum.java | 2 +- .../opensearch/search/aggregations/metrics/MaxAggregator.java | 2 +- .../opensearch/search/aggregations/metrics/MinAggregator.java | 3 +-- 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java b/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java index 65c31ea0035d8..f69cfacaf35d4 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java +++ b/server/src/main/java/org/opensearch/index/fielddata/NumericDoubleValues.java @@ -33,7 +33,6 @@ package org.opensearch.index.fielddata; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.DoubleValues; import org.opensearch.common.annotation.PublicApi; diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index dcb3d1a224a4d..a8df628261e7c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -148,7 +148,7 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { setKahanSummation(bucket); - final int[] count = {0}; + final int[] count = { 0 }; stream.forEach((doc) -> { if (values.advanceExact(doc)) { int valueCount = values.docValueCount(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java index 53346b505e9c2..bae482c9c0726 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java @@ -121,7 +121,7 @@ public void add(double[] values, int count) { double sum = value; double c = delta; // Compensation for lost low-order bits - for (int i=0 ; i { if (values.advanceExact(doc)) { max[0] = Math.max(max[0], values.doubleValue()); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 42bb43cc61c99..4a790ab3c312a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -62,7 +62,6 @@ import org.opensearch.search.streaming.StreamingCostMetrics; import java.io.IOException; -import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -172,7 +171,7 @@ public void collect(int doc, long bucket) throws IOException { @Override public void collect(DocIdStream stream, long bucket) throws IOException { growMins(bucket); - final double[] min = {mins.get(bucket)}; + final double[] min = { mins.get(bucket) }; stream.forEach((doc) -> { if (values.advanceExact(doc)) { min[0] = Math.max(min[0], values.doubleValue()); From b3156afa9ac46d7057d5f2bf6a0be047d2021623 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 23:11:24 -0800 Subject: [PATCH 11/17] Adding bulk logic for stats aggregator Signed-off-by: Ankit Jain --- .../aggregations/metrics/StatsAggregator.java | 83 ++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java index dbfa4641b1733..98fc5cc4d6d42 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java @@ -107,28 +107,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= counts.size()) { - final long from = counts.size(); - final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays.resize(counts, overSize); - sums = bigArrays.resize(sums, overSize); - compensations = bigArrays.resize(compensations, overSize); - mins = bigArrays.resize(mins, overSize); - maxes = bigArrays.resize(maxes, overSize); - mins.fill(from, overSize, Double.POSITIVE_INFINITY); - maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); - } + growStats(bucket); if (values.advanceExact(doc)) { final int valuesCount = values.docValueCount(); counts.increment(bucket, valuesCount); double min = mins.get(bucket); double max = maxes.get(bucket); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); @@ -144,13 +129,73 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + growStats(bucket); + + double[] min = { mins.get(bucket) }; + double[] max = { maxes.get(bucket) }; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + counts.increment(bucket, valuesCount); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + min[0] = Math.min(min[0], value); + max[0] = Math.max(max[0], value); + } + } + }); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + mins.set(bucket, min[0]); + maxes.set(bucket, max[0]); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + growStats(0); + + double minimum = mins.get(0); + double maximum = maxes.get(0); + for (int doc = min; doc < maximum; doc++) { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + counts.increment(0, valuesCount); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + minimum = Math.min(minimum, value); + maximum = Math.max(maximum, value); + } + } + } + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + mins.set(0, minimum); + maxes.set(0, maximum); + } + + private void growStats(long bucket) { + if (bucket >= counts.size()) { + final long from = counts.size(); + final long overSize = BigArrays.overSize(bucket + 1); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); + mins.fill(from, overSize, Double.POSITIVE_INFINITY); + maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); + } + + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); } }; } From 52c1fc73553dca0415cc3a611917286045cf3d00 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sun, 23 Nov 2025 23:12:48 -0800 Subject: [PATCH 12/17] Updating changelog description Signed-off-by: Ankit Jain --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6145721204388..16b28658ee77b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) - Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933)) - Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009)) -- Bulk collection logic for CardinalityAggregator ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067)) +- Bulk collection logic for metrics and cardinality aggregations ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) From abe2810736f45f07e0a292cd78bada0d259fca90 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 24 Nov 2025 13:04:29 -0800 Subject: [PATCH 13/17] Fixing minor test issue Signed-off-by: Ankit Jain --- .../opensearch/search/aggregations/metrics/AvgAggregator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index a8df628261e7c..1031a0370e57d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -135,8 +135,10 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { + int valueCount = values.docValueCount(); setKahanSummation(bucket); - for (int i = 0; i < values.docValueCount(); i++) { + counts.increment(bucket, valueCount); + for (int i = 0; i < valueCount; i++) { double value = values.nextValue(); kahanSummation.add(value); } From d2898ed631bfb89218cf56297f61e61935312a87 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 24 Nov 2025 13:24:25 -0800 Subject: [PATCH 14/17] Fixing minor test issue Signed-off-by: Ankit Jain --- .../search/aggregations/metrics/MaxAggregator.java | 4 ++-- .../search/aggregations/metrics/MinAggregator.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 3f3099aedfd83..63c90caf027a7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -184,8 +184,8 @@ public void collect(DocIdStream stream, long bucket) throws IOException { public void collectRange(int min, int max) throws IOException { growMaxes(0); double maximum = maxes.get(0); - for (int docId = min; docId < max; docId++) { - if (values.advanceExact(docId)) { + for (int doc = min; doc < max; doc++) { + if (values.advanceExact(doc)) { maximum = Math.max(maximum, values.doubleValue()); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 4a790ab3c312a..cb4b530b5bda2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -174,7 +174,7 @@ public void collect(DocIdStream stream, long bucket) throws IOException { final double[] min = { mins.get(bucket) }; stream.forEach((doc) -> { if (values.advanceExact(doc)) { - min[0] = Math.max(min[0], values.doubleValue()); + min[0] = Math.min(min[0], values.doubleValue()); } }); mins.set(bucket, min[0]); @@ -184,9 +184,9 @@ public void collect(DocIdStream stream, long bucket) throws IOException { public void collectRange(int min, int max) throws IOException { growMins(0); double minimum = mins.get(0); - for (int docId = min; docId < max; docId++) { - if (values.advanceExact(docId)) { - minimum = Math.max(minimum, values.doubleValue()); + for (int doc = min; doc < max; doc++) { + if (values.advanceExact(doc)) { + minimum = Math.min(minimum, values.doubleValue()); } } mins.set(0, minimum); From c95d6eb3025081a51f82053838be2bf4a3e2eeb0 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 24 Nov 2025 13:36:12 -0800 Subject: [PATCH 15/17] Fixing minor test issue Signed-off-by: Ankit Jain --- .../opensearch/search/aggregations/metrics/MaxAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 63c90caf027a7..8a656d768cee2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -156,7 +156,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); - return new LeafBucketCollectorBase(sub, values) { + return new LeafBucketCollectorBase(sub, allValues) { @Override public void collect(int doc, long bucket) throws IOException { growMaxes(bucket); From 2d422c170b709c47e1c0fd5e894783fe35c91197 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 24 Nov 2025 16:23:26 -0800 Subject: [PATCH 16/17] Adding tests Signed-off-by: Ankit Jain --- .../FilterRewriteSubAggTests.java | 198 ++++++++++++++++-- 1 file changed, 186 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index be5add530b406..b0c06d48a8a60 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -9,8 +9,10 @@ package org.opensearch.search.aggregations.bucket.filterrewrite; import org.apache.lucene.document.Field; +import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -25,8 +27,10 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregator; @@ -41,7 +45,12 @@ import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.opensearch.search.aggregations.bucket.range.InternalRange; import org.opensearch.search.aggregations.bucket.range.RangeAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalCardinality; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; import org.opensearch.search.aggregations.metrics.InternalStats; +import org.opensearch.search.aggregations.metrics.InternalSum; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.internal.SearchContext; @@ -59,27 +68,34 @@ public class FilterRewriteSubAggTests extends AggregatorTestCase { private final String longFieldName = "metric"; private final String dateFieldName = "timestamp"; + private final String nameFieldName = "name"; private final Query matchAllQuery = new MatchAllDocsQuery(); private final NumberFieldMapper.NumberFieldType longFieldType = new NumberFieldMapper.NumberFieldType( longFieldName, NumberFieldMapper.NumberType.LONG ); private final DateFieldMapper.DateFieldType dateFieldType = aggregableDateFieldType(false, true); + private final KeywordFieldMapper.KeywordFieldType nameFieldType = new KeywordFieldMapper.KeywordFieldType(nameFieldName); private final NumberFieldMapper.NumberType numberType = longFieldType.numberType(); private final String rangeAggName = "range"; private final String autoDateAggName = "auto"; private final String dateAggName = "date"; private final String statsAggName = "stats"; + private final String avgAggName = "avg"; + private final String sumAggName = "sum"; + private final String minAggName = "min"; + private final String maxAggName = "max"; + private final String cardinalityAggName = "cardinality"; private final List DEFAULT_DATA = List.of( - new TestDoc(0, Instant.parse("2020-03-01T00:00:00Z")), - new TestDoc(1, Instant.parse("2020-03-01T00:00:00Z")), - new TestDoc(1, Instant.parse("2020-03-01T00:00:01Z")), - new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z")), - new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z")), - new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z")), - new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), true), - new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z")), - new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z")) + new TestDoc(0, Instant.parse("2020-03-01T00:00:00Z"), "abc"), + new TestDoc(1, Instant.parse("2020-03-01T00:00:00Z"), "def"), + new TestDoc(1, Instant.parse("2020-03-01T00:00:01Z"), "ghi"), + new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z"), "jkl"), + new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z"), "jkl"), + new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z"), "mno"), + new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), "prq", true), + new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z"), "stu"), + new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z"), "stu") ); public void testRange() throws IOException { @@ -111,6 +127,151 @@ public void testRange() throws IOException { assertEquals(3, thirdAuto.getBuckets().size()); } + public void testRangeWithAvgAndSum() throws IOException { + // Test for sum metric aggregation + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.sum(sumAggName).field(longFieldName)); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalSum firstSum = firstBucket.getAggregations().get(sumAggName); + assertEquals(2, firstSum.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalSum secondSum = secondBucket.getAggregations().get(sumAggName); + assertEquals(5, secondSum.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalSum thirdSum = thirdBucket.getAggregations().get(sumAggName); + assertEquals(9, thirdSum.getValue(), 0); + + // Test for average metric aggregation now + rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.avg(avgAggName).field(longFieldName)); + + result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalAvg firstAvg = firstBucket.getAggregations().get(avgAggName); + assertEquals(1, firstAvg.getValue(), 0); + + secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalAvg secondAvg = secondBucket.getAggregations().get(avgAggName); + assertEquals(2.5, secondAvg.getValue(), 0); + + thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalAvg thirdAvg = thirdBucket.getAggregations().get(avgAggName); + assertEquals(4.5, thirdAvg.getValue(), 0); + } + + public void testRangeWithMinAndMax() throws IOException { + // Test for min metric aggregation + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.min(minAggName).field(longFieldName)); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalMin firstMin = firstBucket.getAggregations().get(minAggName); + assertEquals(1, firstMin.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalMin secondMin = secondBucket.getAggregations().get(minAggName); + assertEquals(2, secondMin.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalMin thirdMin = thirdBucket.getAggregations().get(minAggName); + assertEquals(4, thirdMin.getValue(), 0); + + // Test for max metric aggregation now + rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.max(maxAggName).field(longFieldName)); + + result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalMax firstMax = firstBucket.getAggregations().get(maxAggName); + assertEquals(1, firstMax.getValue(), 0); + + secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalMax secondMax = secondBucket.getAggregations().get(maxAggName); + assertEquals(3, secondMax.getValue(), 0); + + thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalMax thirdMax = thirdBucket.getAggregations().get(maxAggName); + assertEquals(5, thirdMax.getValue(), 0); + } + + public void testRangeWithCard() throws IOException { + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.cardinality(cardinalityAggName).field(nameFieldName).executionHint("ordinals")); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalCardinality firstCardinality = firstBucket.getAggregations().get(cardinalityAggName); + assertEquals(2, firstCardinality.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalCardinality secondCardinality = secondBucket.getAggregations().get(cardinalityAggName); + assertEquals(1, secondCardinality.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalCardinality thirdCardinality = thirdBucket.getAggregations().get(cardinalityAggName); + assertEquals(2, thirdCardinality.getValue(), 0); + } + public void testDateHisto() throws IOException { DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder(dateAggName).field( dateFieldName @@ -129,6 +290,7 @@ public void testDateHisto() throws IOException { assertEquals(3, firstStats.getCount()); assertEquals(1, firstStats.getMax(), 0); assertEquals(0, firstStats.getMin(), 0); + assertEquals(2, firstStats.getSum(), 0); InternalDateHistogram.Bucket secondBucket = buckets.get(1); assertEquals("2020-03-01T01:00:00.000Z", secondBucket.getKeyAsString()); @@ -137,6 +299,7 @@ public void testDateHisto() throws IOException { assertEquals(1, secondStats.getCount()); assertEquals(2, secondStats.getMax(), 0); assertEquals(2, secondStats.getMin(), 0); + assertEquals(2, secondStats.getSum(), 0); InternalDateHistogram.Bucket thirdBucket = buckets.get(2); assertEquals("2020-03-01T02:00:00.000Z", thirdBucket.getKeyAsString()); @@ -145,6 +308,7 @@ public void testDateHisto() throws IOException { assertEquals(1, thirdStats.getCount()); assertEquals(3, thirdStats.getMax(), 0); assertEquals(3, thirdStats.getMin(), 0); + assertEquals(3, thirdStats.getSum(), 0); InternalDateHistogram.Bucket fourthBucket = buckets.get(3); assertEquals("2020-03-01T03:00:00.000Z", fourthBucket.getKeyAsString()); @@ -153,6 +317,7 @@ public void testDateHisto() throws IOException { assertEquals(1, fourthStats.getCount()); assertEquals(4, fourthStats.getMax(), 0); assertEquals(4, fourthStats.getMin(), 0); + assertEquals(4, fourthStats.getSum(), 0); InternalDateHistogram.Bucket fifthBucket = buckets.get(4); assertEquals("2020-03-01T04:00:00.000Z", fifthBucket.getKeyAsString()); @@ -161,6 +326,7 @@ public void testDateHisto() throws IOException { assertEquals(2, fifthStats.getCount()); assertEquals(6, fifthStats.getMax(), 0); assertEquals(5, fifthStats.getMin(), 0); + assertEquals(11, fifthStats.getSum(), 0); } public void testAutoDateHisto() throws IOException { @@ -389,7 +555,8 @@ private IA executeAggregationOnReader( matchAllQuery, bucketConsumer, longFieldType, - dateFieldType + dateFieldType, + nameFieldType ); Aggregator aggregator = createAggregator(aggregationBuilder, searchContext); CountingAggregator countingAggregator = new CountingAggregator(new AtomicInteger(), aggregator); @@ -441,15 +608,21 @@ private InternalAggregation.ReduceContext createReduceContext( private class TestDoc { private final long metric; private final Instant timestamp; + private final String name; private final boolean deleted; public TestDoc(long metric, Instant timestamp) { - this(metric, timestamp, false); + this(metric, timestamp, "abc", false); + } + + public TestDoc(long metric, Instant timestamp, String name) { + this(metric, timestamp, name, false); } - public TestDoc(long metric, Instant timestamp, boolean deleted) { + public TestDoc(long metric, Instant timestamp, String name, boolean deleted) { this.metric = metric; this.timestamp = timestamp; + this.name = name; this.deleted = deleted; } @@ -460,6 +633,7 @@ public ParseContext.Document toDocument() { for (Field fld : fieldList) doc.add(fld); doc.add(new LongField(dateFieldName, dateFieldType.parse(timestamp.toString()), Field.Store.NO)); + doc.add(new KeywordField(nameFieldName, name, Field.Store.NO)); return doc; } From 7bda266d4afb4e0f0326b2be64ea55a13d561811 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 24 Nov 2025 16:24:22 -0800 Subject: [PATCH 17/17] Applying spotless Signed-off-by: Ankit Jain --- .../bucket/filterrewrite/FilterRewriteSubAggTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index b0c06d48a8a60..c59fadee03633 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -12,7 +12,6 @@ import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; -import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -30,7 +29,6 @@ import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParseContext; -import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregator;