diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index cffc610900458..5d2fac53f3bd0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Supplier; @@ -147,6 +148,7 @@ public void consumeBucketsAndMaybeBreak(int size) { protected final Map metaData; private final List pipelineAggregators; + private List pipelineAggregatorsForBwcSerialization; /** * Constructs an aggregation result with a given name. @@ -159,16 +161,25 @@ protected InternalAggregation(String name, List pipelineAggr this.metaData = metaData; } + /** + * Merge a {@linkplain PipelineAggregator.PipelineTree} into this + * aggregation result tree before serializing to a node older than + * 7.8.0. + */ + public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { + pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators(); + forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree)); + } + /** * Read from a stream. */ protected InternalAggregation(StreamInput in) throws IOException { name = in.readString(); metaData = in.readMap(); + pipelineAggregators = emptyList(); if (in.getVersion().before(Version.V_7_8_0)) { - pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class); - } else { - pipelineAggregators = emptyList(); + in.readNamedWriteableList(PipelineAggregator.class); } } @@ -177,7 +188,9 @@ public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metaData); if (out.getVersion().before(Version.V_7_8_0)) { - out.writeNamedWriteableList(pipelineAggregators); + assert pipelineAggregatorsForBwcSerialization != null : + "serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization"; + out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization); } doWriteTo(out); } @@ -212,6 +225,11 @@ public InternalAggregation copyWithRewritenBuckets(Function consumer) {} + /** * Creates the output from all pipeline aggs that this aggregation is associated with. Should only * be called after all aggregations have been fully reduced @@ -278,10 +296,23 @@ public Map getMetaData() { return metaData; } + /** + * @deprecated soon to be removed because it is not longer needed + */ + @Deprecated public List pipelineAggregators() { return pipelineAggregators; } + /** + * The {@linkplain PipelineAggregator}s sent to older versions of Elasticsearch. + * @deprecated only use these for serializing to older Elasticsearch versions + */ + @Deprecated + public List pipelineAggregatorsForBwcSerialization() { + return pipelineAggregatorsForBwcSerialization; + } + @Override public String getType() { return getWriteableName(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index c3b5d5ee70038..15f4c94fa9fc9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -100,18 +100,33 @@ public InternalAggregations(StreamInput in) throws IOException { } @Override - @SuppressWarnings("unchecked") public void writeTo(StreamOutput out) throws IOException { - out.writeNamedWriteableList((List)aggregations); if (out.getVersion().before(Version.V_7_8_0)) { if (pipelineTreeForBwcSerialization == null) { + out.writeNamedWriteableList(getInternalAggregations()); out.writeNamedWriteableList(emptyList()); } else { - out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators()); + PipelineAggregator.PipelineTree pipelineTree = pipelineTreeForBwcSerialization.get(); + mergePipelineTreeForBWCSerialization(pipelineTree); + out.writeNamedWriteableList(getInternalAggregations()); + out.writeNamedWriteableList(pipelineTree.aggregators()); } + } else { + out.writeNamedWriteableList(getInternalAggregations()); } } + /** + * Merge a {@linkplain PipelineAggregator.PipelineTree} into this + * aggregation result tree before serializing to a node older than + * 7.8.0. + */ + public void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { + getInternalAggregations().stream().forEach(agg -> { + agg.mergePipelineTreeForBWCSerialization(pipelineTree.subTree(agg.getName())); + }); + } + /** * Make a mutable copy of the aggregation results. *

@@ -135,6 +150,14 @@ public List getTopLevelPipelineAggregators() { .collect(toList()); } + /** + * Get the transient pipeline tree used to serialize pipeline aggregators to old nodes. + */ + @Deprecated + Supplier getPipelineTreeForBwcSerialization() { + return pipelineTreeForBwcSerialization; + } + @SuppressWarnings("unchecked") private List getInternalAggregations() { return (List) aggregations; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index 753648de9af95..af8f294f8f45c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public abstract class InternalMultiBucketAggregation consumer) { + for (B bucket : getBuckets()) { + consumer.accept((InternalAggregations) bucket.getAggregations()); + } + } + private List reducePipelineBuckets(ReduceContext reduceContext, PipelineTree pipelineTree) { List reducedBuckets = new ArrayList<>(); for (B bucket : getBuckets()) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 5df2d1d03ce1d..1f0df9844901a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -181,6 +182,11 @@ public InternalAggregation copyWithRewritenBuckets(Function consumer) { + consumer.accept(aggregations); + } + @Override public boolean equals(Object obj) { if (this == obj) return true; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index e1dacc98789bd..07b13e515aa22 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -88,6 +88,10 @@ private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() { } public static InternalAggregations createTestInstance() throws Exception { + return createTestInstance(randomPipelineTree()); + } + + public static InternalAggregations createTestInstance(PipelineAggregator.PipelineTree pipelineTree) throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { StringTermsTests stringTermsTests = new StringTermsTests(); @@ -104,7 +108,7 @@ public static InternalAggregations createTestInstance() throws Exception { InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); aggsList.add(simpleValueTests.createTestInstance()); } - return new InternalAggregations(aggsList); + return new InternalAggregations(aggsList, () -> pipelineTree); } private static PipelineAggregator.PipelineTree randomPipelineTree() { @@ -129,11 +133,9 @@ public void testSerialization() throws Exception { } public void testGetTopLevelPipelineAggregators() throws Exception { - InternalAggregations orig = createTestInstance(); - PipelineAggregator.PipelineTree tree = randomPipelineTree(); - InternalAggregations withPipelines = new InternalAggregations(orig.copyResults(), () -> tree); - assertThat(withPipelines.aggregations, equalTo(orig.aggregations)); - assertThat(withPipelines.getTopLevelPipelineAggregators(), equalTo(tree.aggregators())); + PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(); + InternalAggregations aggs = createTestInstance(pipelineTree); + assertThat(aggs.getTopLevelPipelineAggregators(), equalTo(pipelineTree.aggregators())); } private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { @@ -146,8 +148,14 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration InternalAggregations deserialized = new InternalAggregations(in); assertEquals(aggregations.aggregations, deserialized.aggregations); if (iteration < 2) { - //serialize this enough times to make sure that we are able to write again what we read - writeToAndReadFrom(deserialized, iteration + 1); + /* + * Add the pipeline tree for bwc serialization just like we + * do when we merge the aggregation. Without that we can't + * properly serialize to older versions. + */ + InternalAggregations asThoughReduced = new InternalAggregations( + deserialized.copyResults(), aggregations.getPipelineTreeForBwcSerialization()); + writeToAndReadFrom(asThoughReduced, iteration + 1); } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index 222b3f21cb2c2..7d04caf22c89b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -102,6 +102,9 @@ public void testStreamResponse() throws Exception { ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); out.setVersion(version); + if (version.before(Version.V_7_8_0)) { + sigTerms.mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree.EMPTY); + } out.writeNamedWriteable(sigTerms); // read diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 47a752e099696..56653d57c2db3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -126,10 +126,12 @@ import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; @@ -140,6 +142,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import java.io.IOException; import java.util.ArrayList; @@ -153,6 +156,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket; @@ -384,6 +388,52 @@ public final void testFromXContentWithRandomFields() throws IOException { assertFromXContent(aggregation, parsedAggregation); } + public void testMergePipelineTreeForBWCSerialization() { + T agg = createTestInstance(); + PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg); + agg.mergePipelineTreeForBWCSerialization(pipelineTree); + assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree); + } + + public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) { + Map subTree = new HashMap<>(); + aggregation.forEachBucket(bucketAggs -> { + for (Aggregation subAgg : bucketAggs) { + if (subTree.containsKey(subAgg.getName())) { + continue; + } + subTree.put(subAgg.getName(), randomPipelineTree((InternalAggregation) subAgg)); + } + }); + return new PipelineAggregator.PipelineTree(emptyMap(), randomPipelineAggregators()); + } + + public static List randomPipelineAggregators() { + List pipelines = new ArrayList<>(); + if (randomBoolean()) { + if (randomBoolean()) { + pipelines.add(new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); + } + if (randomBoolean()) { + pipelines.add(new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); + } + if (randomBoolean()) { + pipelines.add(new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); + } + } + return pipelines; + } + + @SuppressWarnings("deprecation") + private void assertMergedPipelineTreeForBWCSerialization(InternalAggregation agg, PipelineAggregator.PipelineTree pipelineTree) { + assertThat(agg.pipelineAggregatorsForBwcSerialization(), equalTo(pipelineTree.aggregators())); + agg.forEachBucket(bucketAggs -> { + for (Aggregation subAgg : bucketAggs) { + assertMergedPipelineTreeForBWCSerialization((InternalAggregation) subAgg, pipelineTree.subTree(subAgg.getName())); + } + }); + } + protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; @SuppressWarnings("unchecked")