Skip to content

Commit ea6152c

Browse files
authored
Deprecate serializing PipelineAggregators (#54926)
`PipelineAggregator`s are only sent across the wire for backwards compatibility with 7.7.0. `PipelineAggregator` needs to continue to implement `NamedWriteable` for backwards compatibility but pipeline aggregations created after 7.7.0 need not implement any of the methods in that interface because we'll never attempt to call them. So this creates implementations in `PipelineAggregator` (the base class) that just throw exceptions.
1 parent b5c5002 commit ea6152c

File tree

4 files changed

+104
-29
lines changed

4 files changed

+104
-29
lines changed

server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,47 @@ public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> agg
368368
class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder,
369369
ContextParser<String, ? extends PipelineAggregationBuilder>> {
370370
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
371+
/**
372+
* Read the aggregator from a stream.
373+
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
374+
*/
375+
@Deprecated
371376
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;
372377

378+
/**
379+
* Specification of a {@link PipelineAggregator}.
380+
*
381+
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
382+
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
383+
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
384+
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
385+
* {@link StreamInput}
386+
* @param parser reads the aggregation builder from XContent
387+
*/
388+
public PipelineAggregationSpec(ParseField name,
389+
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
390+
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
391+
super(name, builderReader, parser);
392+
this.aggregatorReader = null;
393+
}
394+
395+
/**
396+
* Specification of a {@link PipelineAggregator}.
397+
*
398+
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
399+
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
400+
* {@link NamedWriteable#getWriteableName()}.
401+
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
402+
* {@link StreamInput}
403+
* @param parser reads the aggregation builder from XContent
404+
*/
405+
public PipelineAggregationSpec(String name,
406+
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
407+
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
408+
super(name, builderReader, parser);
409+
this.aggregatorReader = null;
410+
}
411+
373412
/**
374413
* Specification of a {@link PipelineAggregator}.
375414
*
@@ -380,7 +419,10 @@ class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBui
380419
* {@link StreamInput}
381420
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
382421
* @param parser reads the aggregation builder from XContent
422+
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
423+
* pipelines implemented after 7.8.0
383424
*/
425+
@Deprecated
384426
public PipelineAggregationSpec(ParseField name,
385427
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
386428
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
@@ -399,7 +441,10 @@ public PipelineAggregationSpec(ParseField name,
399441
* {@link StreamInput}
400442
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
401443
* @param parser reads the aggregation builder from XContent
444+
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
445+
* implemented after 7.8.0
402446
*/
447+
@Deprecated
403448
public PipelineAggregationSpec(String name,
404449
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
405450
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
@@ -466,8 +511,10 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
466511
}
467512

468513
/**
469-
* The reader for the {@link PipelineAggregator}.
514+
* Read the aggregator from a stream.
515+
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
470516
*/
517+
@Deprecated
471518
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
472519
return aggregatorReader;
473520
}

server/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,10 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
588588
(p, c) -> spec.getParser().parse(p, (String) c)));
589589
namedWriteables.add(
590590
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
591-
namedWriteables.add(
592-
new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
591+
if (spec.getAggregatorReader() != null) {
592+
namedWriteables.add(new NamedWriteableRegistry.Entry(
593+
PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
594+
}
593595
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
594596
namedWriteables
595597
.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()));

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
import static java.util.stream.Collectors.toList;
5858
import static java.util.stream.Collectors.toMap;
5959

60+
/**
61+
* An immutable collection of {@link AggregatorFactories}.
62+
*/
6063
public class AggregatorFactories {
6164
public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+");
6265

@@ -155,26 +158,16 @@ private static AggregatorFactories.Builder parseAggregators(XContentParser parse
155158
return factories;
156159
}
157160

158-
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>());
161+
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]);
159162

160163
private AggregatorFactory[] factories;
161-
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
162164

163165
public static Builder builder() {
164166
return new Builder();
165167
}
166168

167-
private AggregatorFactories(AggregatorFactory[] factories, List<PipelineAggregationBuilder> pipelineAggregators) {
169+
private AggregatorFactories(AggregatorFactory[] factories) {
168170
this.factories = factories;
169-
this.pipelineAggregatorFactories = pipelineAggregators;
170-
}
171-
172-
public List<PipelineAggregator> createPipelineAggregators() {
173-
List<PipelineAggregator> pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size());
174-
for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) {
175-
pipelineAggregators.add(factory.create());
176-
}
177-
return pipelineAggregators;
178171
}
179172

180173
/**
@@ -216,13 +209,16 @@ public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throw
216209
}
217210

218211
/**
219-
* @return the number of sub-aggregator factories not including pipeline
220-
* aggregator factories
212+
* @return the number of sub-aggregator factories
221213
*/
222214
public int countAggregators() {
223215
return factories.length;
224216
}
225217

218+
/**
219+
* A mutable collection of {@link AggregationBuilder}s and
220+
* {@link PipelineAggregationBuilder}s.
221+
*/
226222
public static class Builder implements Writeable, ToXContentObject {
227223
private final Set<String> names = new HashSet<>();
228224

@@ -333,16 +329,13 @@ public AggregatorFactories build(QueryShardContext queryShardContext, Aggregator
333329
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
334330
return EMPTY;
335331
}
336-
List<PipelineAggregationBuilder> orderedPipelineAggregators =
337-
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
338332
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];
339-
340333
int i = 0;
341334
for (AggregationBuilder agg : aggregationBuilders) {
342335
aggFactories[i] = agg.build(queryShardContext, parent);
343336
++i;
344337
}
345-
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
338+
return new AggregatorFactories(aggFactories);
346339
}
347340

348341
private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.search.aggregations.pipeline;
2121

2222

23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.common.ParseField;
2425
import org.elasticsearch.common.io.stream.NamedWriteable;
2526
import org.elasticsearch.common.io.stream.StreamInput;
@@ -113,22 +114,54 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map<String, Obj
113114

114115
/**
115116
* Read from a stream.
117+
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
116118
*/
119+
@Deprecated
117120
protected PipelineAggregator(StreamInput in) throws IOException {
118-
name = in.readString();
119-
bucketsPaths = in.readStringArray();
120-
metadata = in.readMap();
121+
if (in.getVersion().before(Version.V_7_8_0)) {
122+
name = in.readString();
123+
bucketsPaths = in.readStringArray();
124+
metadata = in.readMap();
125+
} else {
126+
throw new IllegalStateException("Cannot deserialize pipeline [" + getClass() + "] from before 7.8.0");
127+
}
121128
}
122129

130+
/**
131+
* {@inheritDoc}
132+
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
133+
*/
123134
@Override
135+
@Deprecated
124136
public final void writeTo(StreamOutput out) throws IOException {
125-
out.writeString(name);
126-
out.writeStringArray(bucketsPaths);
127-
out.writeMap(metadata);
128-
doWriteTo(out);
137+
if (out.getVersion().before(Version.V_7_8_0)) {
138+
out.writeString(name);
139+
out.writeStringArray(bucketsPaths);
140+
out.writeMap(metadata);
141+
doWriteTo(out);
142+
} else {
143+
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
144+
}
145+
}
146+
147+
/**
148+
* Write the body of the aggregation to the wire.
149+
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
150+
*/
151+
@Deprecated
152+
protected void doWriteTo(StreamOutput out) throws IOException {
153+
}
154+
155+
/**
156+
* The name of the writeable object.
157+
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
158+
*/
159+
@Override
160+
@Deprecated
161+
public String getWriteableName() {
162+
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
129163
}
130164

131-
protected abstract void doWriteTo(StreamOutput out) throws IOException;
132165

133166
public String name() {
134167
return name;

0 commit comments

Comments
 (0)