Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
- Optimize Composite Aggregations by removing unnecessary object allocations ([#18531](https://github.com/opensearch-project/OpenSearch/pull/18531))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;
Expand Down Expand Up @@ -135,11 +134,7 @@ public final class CompositeAggregator extends BucketsAggregator {
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
this.sources = new SingleDimensionValuesSource[sourceConfigs.length];

// check that the provided size is not greater than the search.max_buckets setting
int bucketLimit = context.aggregations().multiBucketConsumer().getLimit();
if (size > bucketLimit) {
Expand All @@ -155,15 +150,33 @@ public final class CompositeAggregator extends BucketsAggregator {
bucketLimit
);
}

this.sourceConfigs = sourceConfigs;
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = sourceConfigs[i].createValuesSource(

// Pre-initialize the destination collections with the correct size
final int numSources = sourceConfigs.length;
this.sourceNames = new ArrayList<>(numSources);
this.reverseMuls = new int[numSources];
this.missingOrders = new MissingOrder[numSources];
this.formats = new ArrayList<>(numSources);
this.sources = new SingleDimensionValuesSource[numSources];

// Populate all collections from sourceConfigs
for (int i = 0; i < numSources; i++) {
CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
this.sourceNames.add(sourceConfig.name());
this.reverseMuls[i] = sourceConfig.reverseMul();
this.missingOrders[i] = sourceConfig.missingOrder();
this.formats.add(sourceConfig.format());

this.sources[i] = sourceConfig.createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
}

this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

Expand Down Expand Up @@ -230,7 +243,7 @@ protected int getSize() {

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
return (key) -> bucketOrds.add(0, getRoundingPrepared().round(key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
Expand All @@ -247,14 +260,14 @@ protected void doClose() {
}

@Override
protected void doPreCollection() throws IOException {
protected void doPreCollection() {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

@Override
protected void doPostCollection() throws IOException {
protected void doPostCollection() {
finishLeaf();
}

Expand Down Expand Up @@ -719,14 +732,7 @@ public void collect(int doc, long zeroBucket) throws IOException {
*
* @opensearch.internal
*/
private static class Entry {
final LeafReaderContext context;
final DocIdSet docIdSet;

Entry(LeafReaderContext context, DocIdSet docIdSet) {
this.context = context;
this.docIdSet = docIdSet;
}
private record Entry(LeafReaderContext context, DocIdSet docIdSet) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private class Slot {
this.value = initial;
}

// This is to be only for reusable slot
public void set(int newValue) {
this.value = newValue;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -82,6 +87,14 @@ public int hashCode() {
private final Map<Slot, Integer> map; // to quickly find the slot for a value
private final SingleDimensionValuesSource<?>[] arrays;

/**
* A reusable, flyweight Slot instance to avoid object allocation and reduce GC pressure
* during map lookups in the high-frequency collect() path. This object is NOT
* thread-safe, but is safe here because each collector instance is confined to a
* single thread.
*/
private final Slot reusableSlot = new Slot(0);

private LongArray docCounts;
private boolean afterKeyIsSet = false;

Expand Down Expand Up @@ -125,7 +138,8 @@ boolean isFull() {
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer getCurrentSlot() {
return map.get(new Slot(CANDIDATE_SLOT));
reusableSlot.set(CANDIDATE_SLOT); // Update the state of the reusable slot
return map.get(reusableSlot); // Use the single reusable slot instance for the lookup
}

/**
Expand Down Expand Up @@ -322,7 +336,8 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
if (size() >= maxSize) {
// the queue is full, we replace the last key with this candidate
int slot = pop();
map.remove(new Slot(slot));
reusableSlot.set(slot); // Use reusable for remove
map.remove(reusableSlot);
// and we recycle the deleted slot
newSlot = slot;
} else {
Expand Down
Loading