From 38c041ba2753c2556256030eed0efaef3d9dd800 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 9 Apr 2020 13:47:54 -0400 Subject: [PATCH] Remove serialization from pipeline aggregator `master` doesn't need to talk to Elasticsearch versions before 7.8.0 so `PipelineAggregator` doesn't need to be writable *at all* in master. New pipeline aggregations don't need to worry about serializing `PipelineAggregator` at all so this drops all of it. For the most part we don't need to worry about serialization of `PipelineAggregator` at all any more. When backporting a change to an aggregator that is serialized to previous versions of Elasticsearch it *should* be fairly simple to pick a value to serialize. And the compiler *should* tell you that you need to do it. In many cases you this'll be a noop. *Hopefully* all cases. Closes #53742 --- .../elasticsearch/plugins/SearchPlugin.java | 67 ----------- .../elasticsearch/search/SearchModule.java | 39 +------ .../search/aggregations/AggregationPhase.java | 3 +- .../aggregations/InternalAggregation.java | 31 ------ .../aggregations/InternalAggregations.java | 104 +----------------- .../pipeline/AvgBucketPipelineAggregator.java | 14 --- .../BucketMetricsPipelineAggregator.java | 22 ---- .../BucketScriptPipelineAggregator.java | 28 ----- .../BucketSelectorPipelineAggregator.java | 26 ----- .../BucketSortPipelineAggregator.java | 27 ----- .../CumulativeSumPipelineAggregator.java | 21 ---- .../DerivativePipelineAggregator.java | 25 ----- ...ExtendedStatsBucketPipelineAggregator.java | 21 ---- .../pipeline/MaxBucketPipelineAggregator.java | 14 --- .../pipeline/MinBucketPipelineAggregator.java | 14 --- .../pipeline/MovFnPipelineAggregator.java | 35 ------ .../PercentilesBucketPipelineAggregator.java | 23 ---- .../pipeline/PipelineAggregator.java | 57 +--------- .../SerialDiffPipelineAggregator.java | 25 ----- .../pipeline/SiblingPipelineAggregator.java | 9 -- .../StatsBucketPipelineAggregator.java | 11 -- .../pipeline/SumBucketPipelineAggregator.java | 14 --- .../search/query/QuerySearchResult.java | 62 ++--------- .../search/SearchModuleTests.java | 19 +--- .../InternalAggregationsTests.java | 45 +------- .../SignificanceHeuristicTests.java | 4 - .../test/InternalAggregationTestCase.java | 50 --------- .../xpack/analytics/AnalyticsPlugin.java | 2 - ...mulativeCardinalityPipelineAggregator.java | 21 ---- .../CumulativeCardinalityTests.java | 1 - 30 files changed, 23 insertions(+), 811 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index 72ea2821af4dd..95e74b8a64b40 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -368,46 +368,6 @@ public AggregationSpec setAggregatorRegistrar(Consumer agg class PipelineAggregationSpec extends SearchExtensionSpec> { private final Map> resultReaders = new TreeMap<>(); - /** - * Read the aggregator from a stream. - * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire - */ - @Deprecated - private final Writeable.Reader aggregatorReader; - - /** - * Specification of a {@link PipelineAggregator}. - * - * @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it - * is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and - * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. - * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a - * {@link StreamInput} - * @param parser reads the aggregation builder from XContent - */ - public PipelineAggregationSpec(ParseField name, - Writeable.Reader builderReader, - ContextParser parser) { - super(name, builderReader, parser); - this.aggregatorReader = null; - } - - /** - * Specification of a {@link PipelineAggregator}. - * - * @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the - * {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from - * {@link NamedWriteable#getWriteableName()}. - * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a - * {@link StreamInput} - * @param parser reads the aggregation builder from XContent - */ - public PipelineAggregationSpec(String name, - Writeable.Reader builderReader, - ContextParser parser) { - super(name, builderReader, parser); - this.aggregatorReader = null; - } /** * Specification of a {@link PipelineAggregator}. @@ -417,18 +377,12 @@ public PipelineAggregationSpec(String name, * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent - * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for - * pipelines implemented after 7.8.0 */ - @Deprecated public PipelineAggregationSpec(ParseField name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, ContextParser parser) { super(name, builderReader, parser); - this.aggregatorReader = aggregatorReader; } /** @@ -439,18 +393,12 @@ public PipelineAggregationSpec(ParseField name, * {@link NamedWriteable#getWriteableName()}. * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent - * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines - * implemented after 7.8.0 */ - @Deprecated public PipelineAggregationSpec(String name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, ContextParser parser) { super(name, builderReader, parser); - this.aggregatorReader = aggregatorReader; } /** @@ -461,17 +409,14 @@ public PipelineAggregationSpec(String name, * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent * @deprecated prefer the ctor that takes a {@link ContextParser} */ @Deprecated public PipelineAggregationSpec(ParseField name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, PipelineAggregator.Parser parser) { super(name, builderReader, (p, n) -> parser.parse(n, p)); - this.aggregatorReader = aggregatorReader; } /** @@ -482,16 +427,13 @@ public PipelineAggregationSpec(ParseField name, * {@link NamedWriteable#getWriteableName()}. * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @deprecated prefer the ctor that takes a {@link ContextParser} */ @Deprecated public PipelineAggregationSpec(String name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, PipelineAggregator.Parser parser) { super(name, builderReader, (p, n) -> parser.parse(n, p)); - this.aggregatorReader = aggregatorReader; } /** @@ -510,15 +452,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R return this; } - /** - * Read the aggregator from a stream. - * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire - */ - @Deprecated - public Writeable.Reader getAggregatorReader() { - return aggregatorReader; - } - /** * Get the readers that must be registered for this aggregation's results. */ diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 34ad0fb6b4ead..167477d9a0ba8 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -193,20 +193,13 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketParser; import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.InternalDerivative; import org.elasticsearch.search.aggregations.pipeline.InternalExtendedStatsBucket; @@ -214,31 +207,23 @@ import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.InternalStatsBucket; import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; -import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase; import org.elasticsearch.search.fetch.subphase.ExplainPhase; +import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase; +import org.elasticsearch.search.fetch.subphase.FetchScorePhase; import org.elasticsearch.search.fetch.subphase.FetchSourcePhase; +import org.elasticsearch.search.fetch.subphase.FetchVersionPhase; import org.elasticsearch.search.fetch.subphase.MatchedQueriesPhase; -import org.elasticsearch.search.fetch.subphase.FetchScorePhase; import org.elasticsearch.search.fetch.subphase.ScriptFieldsPhase; import org.elasticsearch.search.fetch.subphase.SeqNoPrimaryTermPhase; -import org.elasticsearch.search.fetch.subphase.FetchVersionPhase; import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase; import org.elasticsearch.search.fetch.subphase.highlight.Highlighter; @@ -502,82 +487,68 @@ private void registerPipelineAggregations(List plugins) { registerPipelineAggregation(new PipelineAggregationSpec( DerivativePipelineAggregationBuilder.NAME, DerivativePipelineAggregationBuilder::new, - DerivativePipelineAggregator::new, DerivativePipelineAggregationBuilder::parse) .addResultReader(InternalDerivative::new)); registerPipelineAggregation(new PipelineAggregationSpec( MaxBucketPipelineAggregationBuilder.NAME, MaxBucketPipelineAggregationBuilder::new, - MaxBucketPipelineAggregator::new, MaxBucketPipelineAggregationBuilder.PARSER) // This bucket is used by many pipeline aggreations. .addResultReader(InternalBucketMetricValue.NAME, InternalBucketMetricValue::new)); registerPipelineAggregation(new PipelineAggregationSpec( MinBucketPipelineAggregationBuilder.NAME, MinBucketPipelineAggregationBuilder::new, - MinBucketPipelineAggregator::new, MinBucketPipelineAggregationBuilder.PARSER) /* Uses InternalBucketMetricValue */); registerPipelineAggregation(new PipelineAggregationSpec( AvgBucketPipelineAggregationBuilder.NAME, AvgBucketPipelineAggregationBuilder::new, - AvgBucketPipelineAggregator::new, AvgBucketPipelineAggregationBuilder.PARSER) // This bucket is used by many pipeline aggreations. .addResultReader(InternalSimpleValue.NAME, InternalSimpleValue::new)); registerPipelineAggregation(new PipelineAggregationSpec( SumBucketPipelineAggregationBuilder.NAME, SumBucketPipelineAggregationBuilder::new, - SumBucketPipelineAggregator::new, SumBucketPipelineAggregationBuilder.PARSER) /* Uses InternalSimpleValue */); registerPipelineAggregation(new PipelineAggregationSpec( StatsBucketPipelineAggregationBuilder.NAME, StatsBucketPipelineAggregationBuilder::new, - StatsBucketPipelineAggregator::new, StatsBucketPipelineAggregationBuilder.PARSER) .addResultReader(InternalStatsBucket::new)); registerPipelineAggregation(new PipelineAggregationSpec( ExtendedStatsBucketPipelineAggregationBuilder.NAME, ExtendedStatsBucketPipelineAggregationBuilder::new, - ExtendedStatsBucketPipelineAggregator::new, new ExtendedStatsBucketParser()) .addResultReader(InternalExtendedStatsBucket::new)); registerPipelineAggregation(new PipelineAggregationSpec( PercentilesBucketPipelineAggregationBuilder.NAME, PercentilesBucketPipelineAggregationBuilder::new, - PercentilesBucketPipelineAggregator::new, PercentilesBucketPipelineAggregationBuilder.PARSER) .addResultReader(InternalPercentilesBucket::new)); registerPipelineAggregation(new PipelineAggregationSpec( CumulativeSumPipelineAggregationBuilder.NAME, CumulativeSumPipelineAggregationBuilder::new, - CumulativeSumPipelineAggregator::new, CumulativeSumPipelineAggregationBuilder::parse)); registerPipelineAggregation(new PipelineAggregationSpec( BucketScriptPipelineAggregationBuilder.NAME, BucketScriptPipelineAggregationBuilder::new, - BucketScriptPipelineAggregator::new, BucketScriptPipelineAggregationBuilder.PARSER)); registerPipelineAggregation(new PipelineAggregationSpec( BucketSelectorPipelineAggregationBuilder.NAME, BucketSelectorPipelineAggregationBuilder::new, - BucketSelectorPipelineAggregator::new, BucketSelectorPipelineAggregationBuilder::parse)); registerPipelineAggregation(new PipelineAggregationSpec( BucketSortPipelineAggregationBuilder.NAME, BucketSortPipelineAggregationBuilder::new, - BucketSortPipelineAggregator::new, BucketSortPipelineAggregationBuilder::parse)); registerPipelineAggregation(new PipelineAggregationSpec( SerialDiffPipelineAggregationBuilder.NAME, SerialDiffPipelineAggregationBuilder::new, - SerialDiffPipelineAggregator::new, SerialDiffPipelineAggregationBuilder::parse)); registerPipelineAggregation(new PipelineAggregationSpec( MovFnPipelineAggregationBuilder.NAME, MovFnPipelineAggregationBuilder::new, - MovFnPipelineAggregator::new, MovFnPipelineAggregationBuilder.PARSER)); registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation); @@ -588,10 +559,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) { (p, c) -> spec.getParser().parse(p, (String) c))); namedWriteables.add( new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())); - if (spec.getAggregatorReader() != null) { - namedWriteables.add(new NamedWriteableRegistry.Entry( - PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); - } for (Map.Entry> resultReader : spec.getResultReaders().entrySet()) { namedWriteables .add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 64c2785c54ba4..9f91dedf30703 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -130,8 +130,7 @@ public void execute(SearchContext context) { throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); } } - context.queryResult().aggregations(new InternalAggregations(aggregations, - context.request().source().aggregations()::buildPipelineTree)); + context.queryResult().aggregations(new InternalAggregations(aggregations)); // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 8c29f8a4422b6..df3971e07a305 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -146,8 +145,6 @@ public void consumeBucketsAndMaybeBreak(int size) { protected final Map metadata; - private List pipelineAggregatorsForBwcSerialization; - /** * Constructs an aggregation result with a given name. * @@ -158,37 +155,18 @@ protected InternalAggregation(String name, Map metadata) { this.metadata = metadata; } - /** - * Merge a {@linkplain PipelineAggregator.PipelineTree} into this - * aggregation result tree before serializing to a node older than - * 7.8.0. - */ - public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { - pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators(); - forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree)); - } - - /** * Read from a stream. */ protected InternalAggregation(StreamInput in) throws IOException { name = in.readString(); metadata = in.readMap(); - if (in.getVersion().before(Version.V_7_8_0)) { - in.readNamedWriteableList(PipelineAggregator.class); - } } @Override public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metadata); - if (out.getVersion().before(Version.V_7_8_0)) { - assert pipelineAggregatorsForBwcSerialization != null : - "serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization"; - out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization); - } doWriteTo(out); } @@ -293,15 +271,6 @@ public Map getMetadata() { return metadata; } - /** - * The {@linkplain PipelineAggregator}s sent to older versions of Elasticsearch. - * @deprecated only use these for serializing to older Elasticsearch versions - */ - @Deprecated - public List pipelineAggregatorsForBwcSerialization() { - return pipelineAggregatorsForBwcSerialization; - } - @Override public String getType() { return getWriteableName(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 15f4c94fa9fc9..72e6cf96760c7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -35,13 +34,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; - /** * An internal implementation of {@link Aggregations}. */ @@ -59,105 +53,29 @@ public final class InternalAggregations extends Aggregations implements Writeabl } }; - /** - * The way to build a tree of pipeline aggregators. Used only for - * serialization backwards compatibility. - */ - private final Supplier pipelineTreeForBwcSerialization; - /** * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); - this.pipelineTreeForBwcSerialization = null; - } - - /** - * Constructs a node in the aggregation tree. - * @param pipelineTreeSource must be null inside the tree or after final reduction. Should reference the - * search request otherwise so we can properly serialize the response to - * versions of Elasticsearch that require the pipelines to be serialized. - */ - public InternalAggregations(List aggregations, Supplier pipelineTreeSource) { - super(aggregations); - this.pipelineTreeForBwcSerialization = pipelineTreeSource; } public InternalAggregations(StreamInput in) throws IOException { super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class))); - if (in.getVersion().before(Version.V_7_8_0)) { - in.readNamedWriteableList(PipelineAggregator.class); - } - /* - * Setting the pipeline tree source to null is here is correct but - * only because we don't immediately pass the InternalAggregations - * off to another node. Instead, we always reduce together with - * many aggregations and that always adds the tree read from the - * current request. - */ - pipelineTreeForBwcSerialization = null; } @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().before(Version.V_7_8_0)) { - if (pipelineTreeForBwcSerialization == null) { - out.writeNamedWriteableList(getInternalAggregations()); - out.writeNamedWriteableList(emptyList()); - } else { - PipelineAggregator.PipelineTree pipelineTree = pipelineTreeForBwcSerialization.get(); - mergePipelineTreeForBWCSerialization(pipelineTree); - out.writeNamedWriteableList(getInternalAggregations()); - out.writeNamedWriteableList(pipelineTree.aggregators()); - } - } else { - out.writeNamedWriteableList(getInternalAggregations()); - } - } - - /** - * Merge a {@linkplain PipelineAggregator.PipelineTree} into this - * aggregation result tree before serializing to a node older than - * 7.8.0. - */ - public void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { - getInternalAggregations().stream().forEach(agg -> { - agg.mergePipelineTreeForBWCSerialization(pipelineTree.subTree(agg.getName())); - }); + out.writeNamedWriteableList(getInternalAggregations()); } /** * Make a mutable copy of the aggregation results. - *

- * IMPORTANT: The copy doesn't include any pipeline aggregations, if there are any. */ public List copyResults() { return new ArrayList<>(getInternalAggregations()); } - /** - * Get the top level pipeline aggregators. - * @deprecated these only exist for BWC serialization - */ - @Deprecated - public List getTopLevelPipelineAggregators() { - if (pipelineTreeForBwcSerialization == null) { - return emptyList(); - } - return pipelineTreeForBwcSerialization.get().aggregators().stream() - .map(p -> (SiblingPipelineAggregator) p) - .collect(toList()); - } - - /** - * Get the transient pipeline tree used to serialize pipeline aggregators to old nodes. - */ - @Deprecated - Supplier getPipelineTreeForBwcSerialization() { - return pipelineTreeForBwcSerialization; - } - @SuppressWarnings("unchecked") private List getInternalAggregations() { return (List) aggregations; @@ -186,8 +104,7 @@ public double sortValue(AggregationPath.PathElement head, Iterator aggregationsList, ReduceContext context) { - InternalAggregations reduced = reduce(aggregationsList, context, - reducedAggregations -> new InternalAggregations(reducedAggregations, context.pipelineTreeForBwcSerialization())); + InternalAggregations reduced = reduce(aggregationsList, context); if (reduced == null) { return null; } @@ -213,13 +130,8 @@ public static InternalAggregations topLevelReduce(List agg * {@link InternalAggregations} object found in the list. * Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled * separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)} - * @param ctor used to build the {@link InternalAggregations}. The top level reduce specifies a constructor - * that adds pipeline aggregation information that is used to send pipeline aggregations to - * older versions of Elasticsearch that require the pipeline aggregations to be returned - * as part of the aggregation tree */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context, - Function, InternalAggregations> ctor) { + public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -245,14 +157,6 @@ public static InternalAggregations reduce(List aggregation reducedAggregations.add(first.reduce(aggregations, context)); } - return ctor.apply(reducedAggregations); - } - - /** - * Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree. - */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { - return reduce(aggregationsList, context, InternalAggregations::new); + return new InternalAggregations(reducedAggregations); } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java index 8a6bbdcc14bd8..9843ab65485f4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; public class AvgBucketPipelineAggregator extends BucketMetricsPipelineAggregator { @@ -36,18 +34,6 @@ public class AvgBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, format, metadata); } - /** - * Read from a stream. - */ - public AvgBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return AvgBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { count = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java index aa45cf14e22a8..42d13c0ac5552 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; @@ -30,7 +28,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.support.AggregationPath; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -50,25 +47,6 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg this.format = format; } - /** - * Read from a stream. - */ - BucketMetricsPipelineAggregator(StreamInput in) throws IOException { - super(in); - format = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - } - - @Override - public final void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - gapPolicy.writeTo(out); - innerWriteTo(out); - } - - protected void innerWriteTo(StreamOutput out) throws IOException { - } - @Override public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) { preCollection(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java index 7d9a0395616ed..e0bb8224202c1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.script.BucketAggregationScript; import org.elasticsearch.script.Script; import org.elasticsearch.search.DocValueFormat; @@ -30,7 +28,6 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,31 +52,6 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - @SuppressWarnings("unchecked") - public BucketScriptPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map) in.readGenericValue(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @Override - public String getWriteableName() { - return BucketScriptPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java index a17e710c75456..5bd8ec16b7340 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.script.BucketAggregationSelectorScript; import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -28,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,29 +46,6 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - @SuppressWarnings("unchecked") - public BucketSelectorPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map) in.readGenericValue(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @Override - public String getWriteableName() { - return BucketSelectorPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java index b639a384c7691..b4d7ec4bf5acf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; @@ -29,7 +27,6 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -52,30 +49,6 @@ public class BucketSortPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - public BucketSortPipelineAggregator(StreamInput in) throws IOException { - super(in); - sorts = in.readList(FieldSortBuilder::new); - from = in.readVInt(); - size = in.readOptionalVInt(); - gapPolicy = GapPolicy.readFrom(in); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeList(sorts); - out.writeVInt(from); - out.writeOptionalVInt(size); - gapPolicy.writeTo(out); - } - - @Override - public String getWriteableName() { - return BucketSortPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java index 5af6fb5e36ca9..e1448954012b0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -30,7 +28,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,24 +45,6 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator { this.formatter = formatter; } - /** - * Read from a stream. - */ - public CumulativeSumPipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - } - - @Override - public String getWriteableName() { - return CumulativeSumPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/DerivativePipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/DerivativePipelineAggregator.java index d603202586d6f..f7d121aae9d5a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/DerivativePipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/DerivativePipelineAggregator.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -30,7 +28,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -52,28 +49,6 @@ public class DerivativePipelineAggregator extends PipelineAggregator { this.xAxisUnits = xAxisUnits == null ? null : (double) xAxisUnits; } - /** - * Read from a stream. - */ - public DerivativePipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - xAxisUnits = in.readOptionalDouble(); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeOptionalDouble(xAxisUnits); - } - - @Override - public String getWriteableName() { - return DerivativePipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java index 28d3804954733..5786a5f59350b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java @@ -19,13 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { @@ -42,24 +39,6 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline this.sigma = sigma; } - /** - * Read from a stream. - */ - public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - sigma = in.readDouble(); - } - - @Override - protected void innerWriteTo(StreamOutput out) throws IOException { - out.writeDouble(sigma); - } - - @Override - public String getWriteableName() { - return ExtendedStatsBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java index 9f837124dd497..a36847147b54c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -38,18 +36,6 @@ public class MaxBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public MaxBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return MaxBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { maxBucketKeys = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java index 0b234bdd74000..a49ca7aa6d95b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -38,18 +36,6 @@ public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public MinBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return MinBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { minBucketKeys = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 00fef79daebd2..cf3dcea230a4f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -19,9 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.script.Script; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -30,7 +27,6 @@ import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -76,37 +72,6 @@ public class MovFnPipelineAggregator extends PipelineAggregator { this.shift = shift; } - public MovFnPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = BucketHelpers.GapPolicy.readFrom(in); - bucketsPath = in.readString(); - window = in.readInt(); - if (in.getVersion().onOrAfter(Version.V_7_4_0)) { - shift = in.readInt(); - } else { - shift = 0; - } - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeString(bucketsPath); - out.writeInt(window); - if (out.getVersion().onOrAfter(Version.V_7_4_0)) { - out.writeInt(shift); - } - } - - @Override - public String getWriteableName() { - return MovFnPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, InternalAggregation.ReduceContext reduceContext) { InternalMultiBucketAggregation diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java index 969d709113984..edfa6c49b3efa 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java @@ -19,13 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -44,26 +41,6 @@ public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAg this.keyed = keyed; } - /** - * Read from a stream. - */ - public PercentilesBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - percents = in.readDoubleArray(); - keyed = in.readBoolean(); - } - - @Override - public void innerWriteTo(StreamOutput out) throws IOException { - out.writeDoubleArray(percents); - out.writeBoolean(keyed); - } - - @Override - public String getWriteableName() { - return PercentilesBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { data = new ArrayList<>(1024); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index a8a01df575267..d0240bedf6604 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -20,11 +20,7 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.NamedWriteable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -37,7 +33,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -public abstract class PipelineAggregator implements NamedWriteable { +public abstract class PipelineAggregator { /** * Parse the {@link PipelineAggregationBuilder} from a {@link XContentParser}. */ @@ -112,57 +108,6 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index 794dc4d85693d..b5b3dcfd1cb37 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -19,13 +19,11 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -34,13 +32,6 @@ protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java index 345560e95de60..483fa2dca8c42 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator { @@ -38,15 +36,6 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat super(name, bucketsPaths, gapPolicy, formatter, metadata); } - public StatsBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return StatsBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java index 4b4a75224c25a..40f702e6b65a3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; public class SumBucketPipelineAggregator extends BucketMetricsPipelineAggregator { @@ -35,18 +33,6 @@ public class SumBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public SumBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return SumBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 6fba4647343ac..ed6f49d8fd940 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -19,12 +19,6 @@ package org.elasticsearch.search.query; -import static java.util.Collections.emptyList; -import static org.elasticsearch.common.lucene.Lucene.readTopDocs; -import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; - -import java.io.IOException; - import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; @@ -37,11 +31,15 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContextId; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; +import java.io.IOException; + +import static org.elasticsearch.common.lucene.Lucene.readTopDocs; +import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; + public final class QuerySearchResult extends SearchPhaseResult { private int from; @@ -317,18 +315,8 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio } } setTopDocs(readTopDocs(in)); - if (in.getVersion().before(Version.V_7_7_0)) { - if (hasAggs = in.readBoolean()) { - aggregations = DelayableWriteable.referencing(new InternalAggregations(in)); - } - if (in.getVersion().before(Version.V_7_2_0)) { - // The list of PipelineAggregators is sent by old versions. We don't need it anyway. - in.readNamedWriteableList(PipelineAggregator.class); - } - } else { - if (hasAggs = in.readBoolean()) { - aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); - } + if (hasAggs = in.readBoolean()) { + aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); } if (in.readBoolean()) { suggest = new Suggest(in); @@ -364,41 +352,7 @@ public void writeToNoId(StreamOutput out) throws IOException { } } writeTopDocs(out, topDocsAndMaxScore); - if (aggregations == null) { - out.writeBoolean(false); - if (out.getVersion().before(Version.V_7_2_0)) { - /* - * Earlier versions expect sibling pipeline aggs separately - * as they used to be set to QuerySearchResult directly, while - * later versions expect them in InternalAggregations. Note - * that despite serializing sibling pipeline aggs as part of - * InternalAggregations is supported since 6.7.0, the shards - * set sibling pipeline aggs to InternalAggregations only from - * 7.1 on. - */ - out.writeNamedWriteableList(emptyList()); - } - } else { - out.writeBoolean(true); - if (out.getVersion().before(Version.V_7_7_0)) { - InternalAggregations aggs = aggregations.get(); - aggs.writeTo(out); - if (out.getVersion().before(Version.V_7_2_0)) { - /* - * Earlier versions expect sibling pipeline aggs separately - * as they used to be set to QuerySearchResult directly, while - * later versions expect them in InternalAggregations. Note - * that despite serializing sibling pipeline aggs as part of - * InternalAggregations is supported since 6.7.0, the shards - * set sibling pipeline aggs to InternalAggregations only from - * 7.1 on. - */ - out.writeNamedWriteableList(aggs.getTopLevelPipelineAggregators()); - } - } else { - aggregations.writeTo(out); - } - } + out.writeOptionalWriteable(aggregations); if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java b/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java index 133eab0b45060..b16909f071d06 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.InternalDerivative; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; @@ -156,7 +155,6 @@ public List getPipelineAggregations() { return singletonList(new PipelineAggregationSpec( DerivativePipelineAggregationBuilder.NAME, DerivativePipelineAggregationBuilder::new, - DerivativePipelineAggregator::new, DerivativePipelineAggregationBuilder::parse) .addResultReader(InternalDerivative::new)); } @@ -279,7 +277,7 @@ public void testRegisterPipelineAggregation() { @Override public List getPipelineAggregations() { return singletonList(new PipelineAggregationSpec("test", - TestPipelineAggregationBuilder::new, TestPipelineAggregator::new, TestPipelineAggregationBuilder::fromXContent)); + TestPipelineAggregationBuilder::new, TestPipelineAggregationBuilder::fromXContent)); } })); @@ -456,19 +454,8 @@ protected void validate(ValidationContext context) {} * Dummy test {@link PipelineAggregator} used to test registering aggregation builders. */ private static class TestPipelineAggregator extends PipelineAggregator { - /** - * Read from a stream. - */ - TestPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - @Override - public String getWriteableName() { - return "test"; - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { + TestPipelineAggregator() { + super("test", new String[] {}, null); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index e98fe4a6249b8..a2e99a9ed1d9a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -29,12 +29,9 @@ import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; -import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.VersionUtils; @@ -46,7 +43,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; -import static org.hamcrest.Matchers.equalTo; public class InternalAggregationsTests extends ESTestCase { @@ -65,7 +61,6 @@ public void testNonFinalReduceTopLevelPipelineAggs() { 10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); List aggs = singletonList(new InternalAggregations(Collections.singletonList(terms))); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); - assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -76,7 +71,6 @@ public void testFinalReduceTopLevelPipelineAggs() { InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), maxBucketReduceContext().forFinalReduction()); - assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } @@ -88,10 +82,6 @@ private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() { } public static InternalAggregations createTestInstance() throws Exception { - return createTestInstance(randomPipelineTree()); - } - - public static InternalAggregations createTestInstance(PipelineAggregator.PipelineTree pipelineTree) throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { StringTermsTests stringTermsTests = new StringTermsTests(); @@ -108,23 +98,7 @@ public static InternalAggregations createTestInstance(PipelineAggregator.Pipelin InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); aggsList.add(simpleValueTests.createTestInstance()); } - return new InternalAggregations(aggsList, () -> pipelineTree); - } - - private static PipelineAggregator.PipelineTree randomPipelineTree() { - List topLevelPipelineAggs = new ArrayList<>(); - if (randomBoolean()) { - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); - } - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator)new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); - } - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); - } - } - return new PipelineAggregator.PipelineTree(emptyMap(), topLevelPipelineAggs); + return new InternalAggregations(aggsList); } public void testSerialization() throws Exception { @@ -132,14 +106,8 @@ public void testSerialization() throws Exception { writeToAndReadFrom(aggregations, 0); } - public void testGetTopLevelPipelineAggregators() throws Exception { - PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(); - InternalAggregations aggs = createTestInstance(pipelineTree); - assertThat(aggs.getTopLevelPipelineAggregators(), equalTo(pipelineTree.aggregators())); - } - private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { - Version version = VersionUtils.randomVersion(random()); + Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(version); aggregations.writeTo(out); @@ -148,14 +116,7 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration InternalAggregations deserialized = new InternalAggregations(in); assertEquals(aggregations.aggregations, deserialized.aggregations); if (iteration < 2) { - /* - * Add the pipeline tree for bwc serialization just like we - * do when we merge the aggregation. Without that we can't - * properly serialize to older versions. - */ - InternalAggregations asThoughReduced = new InternalAggregations( - deserialized.copyResults(), aggregations.getPipelineTreeForBwcSerialization()); - writeToAndReadFrom(asThoughReduced, iteration + 1); + writeToAndReadFrom(deserialized, iteration + 1); } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index 172f740c54290..39a51ee5db475 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -46,7 +46,6 @@ import org.elasticsearch.search.aggregations.bucket.significant.heuristics.MutualInformation; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.PercentageScore; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.TestSearchContext; @@ -102,9 +101,6 @@ public void testStreamResponse() throws Exception { ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); out.setVersion(version); - if (version.before(Version.V_7_8_0)) { - sigTerms.mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree.EMPTY); - } out.writeNamedWriteable(sigTerms); // read diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 1002515fd569b..adb4793d83450 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -126,12 +126,10 @@ import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; @@ -142,7 +140,6 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import java.io.IOException; import java.util.ArrayList; @@ -156,7 +153,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket; @@ -400,52 +396,6 @@ public final void testFromXContentWithRandomFields() throws IOException { assertFromXContent(aggregation, parsedAggregation); } - public void testMergePipelineTreeForBWCSerialization() { - T agg = createTestInstance(); - PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg); - agg.mergePipelineTreeForBWCSerialization(pipelineTree); - assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree); - } - - public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) { - Map subTree = new HashMap<>(); - aggregation.forEachBucket(bucketAggs -> { - for (Aggregation subAgg : bucketAggs) { - if (subTree.containsKey(subAgg.getName())) { - continue; - } - subTree.put(subAgg.getName(), randomPipelineTree((InternalAggregation) subAgg)); - } - }); - return new PipelineAggregator.PipelineTree(emptyMap(), randomPipelineAggregators()); - } - - public static List randomPipelineAggregators() { - List pipelines = new ArrayList<>(); - if (randomBoolean()) { - if (randomBoolean()) { - pipelines.add(new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); - } - if (randomBoolean()) { - pipelines.add(new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); - } - if (randomBoolean()) { - pipelines.add(new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); - } - } - return pipelines; - } - - @SuppressWarnings("deprecation") - private void assertMergedPipelineTreeForBWCSerialization(InternalAggregation agg, PipelineAggregator.PipelineTree pipelineTree) { - assertThat(agg.pipelineAggregatorsForBwcSerialization(), equalTo(pipelineTree.aggregators())); - agg.forEachBucket(bucketAggs -> { - for (Aggregation subAgg : bucketAggs) { - assertMergedPipelineTreeForBWCSerialization((InternalAggregation) subAgg, pipelineTree.subTree(subAgg.getName())); - } - }); - } - protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index 7d006efefa2d5..28cddee7a9045 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -34,7 +34,6 @@ import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; -import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator; import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper; import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats; import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder; @@ -74,7 +73,6 @@ public List getPipelineAggregations() { new PipelineAggregationSpec( CumulativeCardinalityPipelineAggregationBuilder.NAME, CumulativeCardinalityPipelineAggregationBuilder::new, - CumulativeCardinalityPipelineAggregator::new, usage.track(AnalyticsUsage.Item.CUMULATIVE_CARDINALITY, checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))) ); diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java index 7e8ad20b0f102..331b11a79241b 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.analytics.cumulativecardinality; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -22,7 +20,6 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -37,24 +34,6 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator this.formatter = formatter; } - /** - * Read from a stream. - */ - public CumulativeCardinalityPipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - } - - @Override - public String getWriteableName() { - return CumulativeCardinalityPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityTests.java index d9ff43e7d06c8..b7c523c0bad22 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityTests.java @@ -32,7 +32,6 @@ public List getPipelineAggregations() { return singletonList(new PipelineAggregationSpec( CumulativeCardinalityPipelineAggregationBuilder.NAME, CumulativeCardinalityPipelineAggregationBuilder::new, - CumulativeCardinalityPipelineAggregator::new, CumulativeCardinalityPipelineAggregationBuilder.PARSER)); } });