diff --git a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index fe5c65d670468..e12e2cb604e08 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -349,8 +349,47 @@ public Map> getResultRea class PipelineAggregationSpec extends SearchExtensionSpec> { private final Map> resultReaders = new TreeMap<>(); + /** + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire + */ + @Deprecated private final Writeable.Reader aggregatorReader; + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it + * is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and + * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(ParseField name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the + * {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from + * {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(String name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + /** * Specification of a {@link PipelineAggregator}. * @@ -361,7 +400,10 @@ class PipelineAggregationSpec extends SearchExtensionSpec builderReader, Writeable.Reader aggregatorReader, @@ -380,7 +422,10 @@ public PipelineAggregationSpec(ParseField name, * {@link StreamInput} * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent + * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines + * implemented after 7.8.0 */ + @Deprecated public PipelineAggregationSpec(String name, Writeable.Reader builderReader, Writeable.Reader aggregatorReader, @@ -447,8 +492,10 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R } /** - * The reader for the {@link PipelineAggregator}. + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire */ + @Deprecated public Writeable.Reader getAggregatorReader() { return aggregatorReader; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 9db914f70da82..5a8457d808341 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -575,8 +575,10 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) { } namedWriteables.add( new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())); - namedWriteables.add( - new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + if (spec.getAggregatorReader() != null) { + namedWriteables.add(new NamedWriteableRegistry.Entry( + PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + } for (Map.Entry> resultReader : spec.getResultReaders().entrySet()) { namedWriteables .add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 33d18f211f094..47e9fc4df36cc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -57,6 +57,9 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +/** + * An immutable collection of {@link AggregatorFactories}. + */ public class AggregatorFactories { public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+"); @@ -155,26 +158,16 @@ private static AggregatorFactories.Builder parseAggregators(XContentParser parse return factories; } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]); private AggregatorFactory[] factories; - private List pipelineAggregatorFactories; public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { + private AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; - this.pipelineAggregatorFactories = pipelineAggregators; - } - - public List createPipelineAggregators() { - List pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size()); - for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) { - pipelineAggregators.add(factory.create()); - } - return pipelineAggregators; } /** @@ -216,13 +209,16 @@ public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throw } /** - * @return the number of sub-aggregator factories not including pipeline - * aggregator factories + * @return the number of sub-aggregator factories */ public int countAggregators() { return factories.length; } + /** + * A mutable collection of {@link AggregationBuilder}s and + * {@link PipelineAggregationBuilder}s. + */ public static class Builder implements Writeable, ToXContentObject { private final Set names = new HashSet<>(); @@ -333,16 +329,13 @@ public AggregatorFactories build(QueryShardContext queryShardContext, Aggregator if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) { return EMPTY; } - List orderedPipelineAggregators = - resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders); AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()]; - int i = 0; for (AggregationBuilder agg : aggregationBuilders) { aggFactories[i] = agg.build(queryShardContext, parent); ++i; } - return new AggregatorFactories(aggFactories, orderedPipelineAggregators); + return new AggregatorFactories(aggFactories); } private List resolvePipelineAggregatorOrder( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 7444b2119e4c8..a8a01df575267 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -113,22 +114,54 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map