Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,39 @@
- match: { aggregations.cluster.buckets.0.key: "local_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 5 }

# once more, this time with a pipeline agg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should we maybe split this out into it's own test? Just thinking that long multi-step yaml tests can be tricky to debug sometimes.

Haven't looked at the rest of the tests in this yaml though, so it might not be easy to move the indexing (or whatever else) steps up to the setup though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it, sure! They can get hard to debug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - this is a test that doesn't clear indices after it runs. So moving things around is a bit more complex than we'd like to be honest. I can do it, but I think it should wait for a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, no worries then. not worth re-arranging everything for 👍

- do:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a similar test for the rolling-upgrade module? Theoretically it should be the same as CCS, but it might also smoke out different issues due to heterogeneous serialization inside the same cluster (instead of funneling through a gateway).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great to have a "mixed cluster CCS" test. I talked that one through with @javanna and we don't have one now and probably don't want to build one just for this.

search:
rest_total_hits_as_int: true
index: test_index,my_remote_cluster:test_index
body:
seq_no_primary_term: true
aggs:
cluster:
terms:
field: f1.keyword
aggs:
s:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a non-top-level pipeline agg just to confirm they aren't affected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured I'd get it in my next PR about non-top-level pipeline aggs, but I'm happy to do it now!

sum:
field: filter_field
average_sum:
avg_bucket:
buckets_path: cluster.s

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.0.s.value: 2 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.0.s.value: 2 }
- match: { aggregations.average_sum.value: 2 }

---
"Add transient remote cluster based on the preset cluster":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,8 @@ public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchRe
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService);
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService,
() -> requestToPipelineTree(request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,16 @@ public void execute(SearchContext context) {
}
}
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
if (pipelineAggregator instanceof SiblingPipelineAggregator) {
siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
} else {
if (false == pipelineAggregator instanceof SiblingPipelineAggregator) {
// TODO move this to request validation after #53669
throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name()
+ "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are "
+ "allowed at the top level");
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
context.queryResult().aggregations(new InternalAggregations(aggregations,
context.request().source().aggregations()::buildPipelineTree));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -37,7 +38,9 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -62,12 +65,19 @@ public static class ReduceContext {
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
private final Supplier<PipelineTree> pipelineTreeForBwcSerialization;

/**
* Build a {@linkplain ReduceContext} to perform a partial reduction.
*/
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null);
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService,
Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization);
}

/**
Expand All @@ -77,15 +87,16 @@ public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptServi
public static ReduceContext forFinalReduction(BigArrays bigArrays, ScriptService scriptService,
IntConsumer multiBucketConsumer, PipelineTree pipelineTreeRoot) {
return new ReduceContext(bigArrays, scriptService, multiBucketConsumer,
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"));
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"), () -> pipelineTreeRoot);
}

private ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot) {
PipelineTree pipelineTreeRoot, Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
}

/**
Expand All @@ -112,6 +123,15 @@ public PipelineTree pipelineTreeRoot() {
return pipelineTreeRoot;
}

/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
public Supplier<PipelineTree> pipelineTreeForBwcSerialization() {
return pipelineTreeForBwcSerialization;
}

/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
Expand All @@ -129,9 +149,9 @@ public void consumeBucketsAndMaybeBreak(int size) {
private final List<PipelineAggregator> pipelineAggregators;

/**
* Constructs an get with a given name.
* Constructs an aggregation result with a given name.
*
* @param name The name of the get.
* @param name The name of the aggregation.
*/
protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
this.name = name;
Expand All @@ -145,14 +165,20 @@ protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggr
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
if (in.getVersion().before(Version.V_8_0_0)) {
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
} else {
pipelineAggregators = emptyList();
}
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
out.writeNamedWriteableList(pipelineAggregators);
if (out.getVersion().before(Version.V_8_0_0)) {
out.writeNamedWriteableList(pipelineAggregators);
}
doWriteTo(out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -34,9 +35,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

/**
* An internal implementation of {@link Aggregations}.
*/
Expand All @@ -54,35 +59,56 @@ public final class InternalAggregations extends Aggregations implements Writeabl
}
};

private final List<SiblingPipelineAggregator> topLevelPipelineAggregators;
/**
* The way to build a tree of pipeline aggregators. Used only for
* serialization backwards compatibility.
*/
private final Supplier<PipelineAggregator.PipelineTree> pipelineTreeForBwcSerialization;

/**
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
this.topLevelPipelineAggregators = Collections.emptyList();
this.pipelineTreeForBwcSerialization = null;
}

/**
* Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s
* Constructs a node in the aggregation tree.
* @param pipelineTreeSource must be null inside the tree or after final reduction. Should reference the
* search request otherwise so we can properly serialize the response to
* versions of Elasticsearch that require the pipelines to be serialized.
*/
public InternalAggregations(List<InternalAggregation> aggregations, List<SiblingPipelineAggregator> topLevelPipelineAggregators) {
public InternalAggregations(List<InternalAggregation> aggregations, Supplier<PipelineAggregator.PipelineTree> pipelineTreeSource) {
super(aggregations);
this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators);
this.pipelineTreeForBwcSerialization = pipelineTreeSource;
}

public InternalAggregations(StreamInput in) throws IOException {
super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)));
this.topLevelPipelineAggregators = in.readList(
stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class));
if (in.getVersion().before(Version.V_8_0_0)) { // TODO switch to 7.7.0 before merging
in.readNamedWriteableList(PipelineAggregator.class);
}
/*
* Setting the pipeline tree source to null is here is correct but
* only because we don't immediately pass the InternalAggregations
* off to another node. Instead, we always reduce together with
* many aggregations and that always adds the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the comment trails off without finishing it's thought :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I do that somet

*/
pipelineTreeForBwcSerialization = null;
}

@Override
@SuppressWarnings("unchecked")
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
out.writeNamedWriteableList(topLevelPipelineAggregators);
if (out.getVersion().before(Version.V_8_0_0)) { // TODO switch to 7.7.0 before merging
if (pipelineTreeForBwcSerialization == null) {
out.writeNamedWriteableList(emptyList());
} else {
out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators());
}
}
}

/**
Expand All @@ -95,12 +121,17 @@ public List<InternalAggregation> copyResults() {
}

/**
* Returns the top-level pipeline aggregators.
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
* become part of the list of {@link InternalAggregation}s.
* Get the top level pipeline aggregators.
* @deprecated these only exist for BWC serialization
*/
@Deprecated
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
if (pipelineTreeForBwcSerialization == null) {
return emptyList();
}
return pipelineTreeForBwcSerialization.get().aggregators().stream()
.map(p -> (SiblingPipelineAggregator) p)
.collect(toList());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -131,7 +162,8 @@ public double sortValue(AggregationPath.PathElement head, Iterator<AggregationPa
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
InternalAggregations reduced = reduce(aggregationsList, context,
reducedAggregations -> new InternalAggregations(reducedAggregations, context.pipelineTreeForBwcSerialization()));
if (reduced == null) {
return null;
}
Expand All @@ -157,12 +189,16 @@ public static InternalAggregations topLevelReduce(List<InternalAggregations> agg
* {@link InternalAggregations} object found in the list.
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
* @param ctor used to build the {@link InternalAggregations}. The top level reduce specifies a constructor
* that adds pipeline aggregation information that is used to send pipeline aggregations to
* older versions of Elasticsearch that require the pipeline aggregations to be returned
* as part of the aggregation tree
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context,
Function<List<InternalAggregation>, InternalAggregations> ctor) {
if (aggregationsList.isEmpty()) {
return null;
}
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();

// first we collect all aggregations of the same type and list them together
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();
Expand All @@ -185,6 +221,14 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
reducedAggregations.add(first.reduce(aggregations, context));
}

return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
return ctor.apply(reducedAggregations);
}

/**
* Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
return reduce(aggregationsList, context, InternalAggregations::new);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,14 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
Expand Down Expand Up @@ -317,16 +313,8 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio
aggregations = new InternalAggregations(in);
}
if (in.getVersion().before(Version.V_7_2_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = aggregations.asList().stream()
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
}
// We don't need the PipelineAggregators that come back because we use the ones in the request
in.readNamedWriteableList(PipelineAggregator.class);
}
if (in.readBoolean()) {
suggest = new Suggest(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void setup() {
@Override
public ReduceContext forPartialReduction() {
reductions.add(false);
return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null);
return InternalAggregation.ReduceContext.forPartialReduction(
BigArrays.NON_RECYCLING_INSTANCE, null, () -> PipelineTree.EMPTY);
}

public ReduceContext forFinalReduction() {
Expand Down
Loading