From 5a5c2e656b6565f6f71c75bd0b0d582ff79e5a4f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 9 Apr 2020 12:01:34 -0400 Subject: [PATCH] Deprecate serializing PipelineAggregators (#54926) `PipelineAggregator`s are only sent across the wire for backwards compatibility with 7.7.0. `PipelineAggregator` needs to continue to implement `NamedWriteable` for backwards compatibility but pipeline aggregations created after 7.7.0 need not implement any of the methods in that interface because we'll never attempt to call them. So this creates implementations in `PipelineAggregator` (the base class) that just throw exceptions. --- .../elasticsearch/plugins/SearchPlugin.java | 49 ++++++++++++++++++- .../elasticsearch/search/SearchModule.java | 6 ++- .../aggregations/AggregatorFactories.java | 29 +++++------ .../pipeline/PipelineAggregator.java | 49 ++++++++++++++++--- 4 files changed, 104 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index fe5c65d670468..e12e2cb604e08 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -349,8 +349,47 @@ public Map> getResultRea class PipelineAggregationSpec extends SearchExtensionSpec> { private final Map> resultReaders = new TreeMap<>(); + /** + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire + */ + @Deprecated private final Writeable.Reader aggregatorReader; + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it + * is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and + * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(ParseField name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the + * {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from + * {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(String name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + /** * Specification of a {@link PipelineAggregator}. * @@ -361,7 +400,10 @@ class PipelineAggregationSpec extends SearchExtensionSpec builderReader, Writeable.Reader aggregatorReader, @@ -380,7 +422,10 @@ public PipelineAggregationSpec(ParseField name, * {@link StreamInput} * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent + * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines + * implemented after 7.8.0 */ + @Deprecated public PipelineAggregationSpec(String name, Writeable.Reader builderReader, Writeable.Reader aggregatorReader, @@ -447,8 +492,10 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R } /** - * The reader for the {@link PipelineAggregator}. + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire */ + @Deprecated public Writeable.Reader getAggregatorReader() { return aggregatorReader; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 9db914f70da82..5a8457d808341 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -575,8 +575,10 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) { } namedWriteables.add( new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())); - namedWriteables.add( - new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + if (spec.getAggregatorReader() != null) { + namedWriteables.add(new NamedWriteableRegistry.Entry( + PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + } for (Map.Entry> resultReader : spec.getResultReaders().entrySet()) { namedWriteables .add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 33d18f211f094..47e9fc4df36cc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -57,6 +57,9 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +/** + * An immutable collection of {@link AggregatorFactories}. + */ public class AggregatorFactories { public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+"); @@ -155,26 +158,16 @@ private static AggregatorFactories.Builder parseAggregators(XContentParser parse return factories; } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]); private AggregatorFactory[] factories; - private List pipelineAggregatorFactories; public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { + private AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; - this.pipelineAggregatorFactories = pipelineAggregators; - } - - public List createPipelineAggregators() { - List pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size()); - for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) { - pipelineAggregators.add(factory.create()); - } - return pipelineAggregators; } /** @@ -216,13 +209,16 @@ public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throw } /** - * @return the number of sub-aggregator factories not including pipeline - * aggregator factories + * @return the number of sub-aggregator factories */ public int countAggregators() { return factories.length; } + /** + * A mutable collection of {@link AggregationBuilder}s and + * {@link PipelineAggregationBuilder}s. + */ public static class Builder implements Writeable, ToXContentObject { private final Set names = new HashSet<>(); @@ -333,16 +329,13 @@ public AggregatorFactories build(QueryShardContext queryShardContext, Aggregator if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) { return EMPTY; } - List orderedPipelineAggregators = - resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders); AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()]; - int i = 0; for (AggregationBuilder agg : aggregationBuilders) { aggFactories[i] = agg.build(queryShardContext, parent); ++i; } - return new AggregatorFactories(aggFactories, orderedPipelineAggregators); + return new AggregatorFactories(aggFactories); } private List resolvePipelineAggregatorOrder( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 7444b2119e4c8..a8a01df575267 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -113,22 +114,54 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map