Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;
Expand Down Expand Up @@ -119,6 +118,7 @@ public final class CompositeAggregator extends BucketsAggregator {
private BucketCollector deferredCollectors;

private boolean earlyTerminated;
private Weight weight;

private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
private LongKeyedBucketOrds bucketOrds;
Expand All @@ -135,11 +135,31 @@ public final class CompositeAggregator extends BucketsAggregator {
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
final int numSources = sourceConfigs.length;

// Pre-initialize the destination collections with the correct size
this.sourceNames = new ArrayList<>(numSources);
this.reverseMuls = new int[numSources];
this.missingOrders = new MissingOrder[numSources];
this.formats = new ArrayList<>(numSources);
this.sources = new SingleDimensionValuesSource[numSources];

// Populate all collections
for (int i = 0; i < numSources; i++) {
CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
this.sourceNames.add(sourceConfig.name());
this.reverseMuls[i] = sourceConfig.reverseMul();
this.missingOrders[i] = sourceConfig.missingOrder();
this.formats.add(sourceConfig.format());

this.sources[i] = sourceConfig.createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
}

// check that the provided size is not greater than the search.max_buckets setting
int bucketLimit = context.aggregations().multiBucketConsumer().getLimit();
if (size > bucketLimit) {
Expand All @@ -156,14 +176,6 @@ public final class CompositeAggregator extends BucketsAggregator {
);
}
this.sourceConfigs = sourceConfigs;
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = sourceConfigs[i].createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

Expand Down Expand Up @@ -230,7 +242,7 @@ protected int getSize() {

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
return (key) -> bucketOrds.add(0, getRoundingPrepared().round(key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
Expand Down Expand Up @@ -567,8 +579,15 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
}
}

public void setWeight(Weight weight) {
this.weight = weight;
}

@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
if (this.weight != null && this.weight.count(ctx) == 0) {
return true;
}
finishLeaf(); // May need to wrap up previous leaf if it could not be precomputed
return filterRewriteOptimizationContext.tryOptimize(
ctx,
Expand Down
Loading