Skip to content

Commit a5adac0

Browse files
authored
Fix pipeline agg serialization for ccs (backport of #54282) (#54468)
This fixes pipeline aggregations used in cross cluster search from an older version of Elasticsearch to a newer version of Elasticsearch. I broke this in #53730 when I was too aggressive in shutting off serialization of pipeline aggs. In particular, this comes up when the coordinating node is pre-7.8.0 and the gateway node is on or after 7.8.0. The fix is another step down the line to remove pipeline aggregators from the aggregation tree. Sort of. It create a new `List<PipelineAggregator>` member in `InternalAggregation` *but* it is only used for bwc serialization and it is fed by the mechanism established in #53730 to read the pipelines from the
1 parent 7467cc0 commit a5adac0

File tree

7 files changed

+153
-18
lines changed

7 files changed

+153
-18
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Objects;
39+
import java.util.function.Consumer;
3940
import java.util.function.Function;
4041
import java.util.function.IntConsumer;
4142
import java.util.function.Supplier;
@@ -147,6 +148,7 @@ public void consumeBucketsAndMaybeBreak(int size) {
147148
protected final Map<String, Object> metadata;
148149

149150
private final List<PipelineAggregator> pipelineAggregators;
151+
private List<PipelineAggregator> pipelineAggregatorsForBwcSerialization;
150152

151153
/**
152154
* Constructs an aggregation result with a given name.
@@ -159,16 +161,25 @@ protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggr
159161
this.metadata = metadata;
160162
}
161163

164+
/**
165+
* Merge a {@linkplain PipelineAggregator.PipelineTree} into this
166+
* aggregation result tree before serializing to a node older than
167+
* 7.8.0.
168+
*/
169+
public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
170+
pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators();
171+
forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree));
172+
}
173+
162174
/**
163175
* Read from a stream.
164176
*/
165177
protected InternalAggregation(StreamInput in) throws IOException {
166178
name = in.readString();
167179
metadata = in.readMap();
180+
pipelineAggregators = emptyList();
168181
if (in.getVersion().before(Version.V_7_8_0)) {
169-
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
170-
} else {
171-
pipelineAggregators = emptyList();
182+
in.readNamedWriteableList(PipelineAggregator.class);
172183
}
173184
}
174185

@@ -177,7 +188,9 @@ public final void writeTo(StreamOutput out) throws IOException {
177188
out.writeString(name);
178189
out.writeGenericValue(metadata);
179190
if (out.getVersion().before(Version.V_7_8_0)) {
180-
out.writeNamedWriteableList(pipelineAggregators);
191+
assert pipelineAggregatorsForBwcSerialization != null :
192+
"serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization";
193+
out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization);
181194
}
182195
doWriteTo(out);
183196
}
@@ -212,6 +225,11 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
212225
"Aggregation [" + getName() + "] must be a bucket aggregation but was [" + getWriteableName() + "]");
213226
}
214227

228+
/**
229+
* Run a {@linkplain Consumer} over all buckets in this aggregation.
230+
*/
231+
public void forEachBucket(Consumer<InternalAggregations> consumer) {}
232+
215233
/**
216234
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
217235
* be called after all aggregations have been fully reduced
@@ -278,10 +296,23 @@ public Map<String, Object> getMetadata() {
278296
return metadata;
279297
}
280298

299+
/**
300+
* @deprecated soon to be removed because it is not longer needed
301+
*/
302+
@Deprecated
281303
public List<PipelineAggregator> pipelineAggregators() {
282304
return pipelineAggregators;
283305
}
284306

307+
/**
308+
* The {@linkplain PipelineAggregator}s sent to older versions of Elasticsearch.
309+
* @deprecated only use these for serializing to older Elasticsearch versions
310+
*/
311+
@Deprecated
312+
public List<PipelineAggregator> pipelineAggregatorsForBwcSerialization() {
313+
return pipelineAggregatorsForBwcSerialization;
314+
}
315+
285316
@Override
286317
public String getType() {
287318
return getWriteableName();

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
2626
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2727
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
28+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
2829
import org.elasticsearch.search.aggregations.support.AggregationPath;
2930

3031
import java.io.IOException;
@@ -100,18 +101,38 @@ public InternalAggregations(StreamInput in) throws IOException {
100101
}
101102

102103
@Override
103-
@SuppressWarnings("unchecked")
104104
public void writeTo(StreamOutput out) throws IOException {
105-
out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
106-
if (out.getVersion().before(Version.V_7_8_0) && out.getVersion().onOrAfter(Version.V_6_7_0)) {
105+
if (out.getVersion().before(Version.V_7_8_0)) {
107106
if (pipelineTreeForBwcSerialization == null) {
108-
out.writeNamedWriteableList(emptyList());
107+
mergePipelineTreeForBWCSerialization(PipelineTree.EMPTY);
108+
out.writeNamedWriteableList(getInternalAggregations());
109+
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
110+
out.writeNamedWriteableList(emptyList());
111+
}
109112
} else {
110-
out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators());
113+
PipelineAggregator.PipelineTree pipelineTree = pipelineTreeForBwcSerialization.get();
114+
mergePipelineTreeForBWCSerialization(pipelineTree);
115+
out.writeNamedWriteableList(getInternalAggregations());
116+
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
117+
out.writeNamedWriteableList(emptyList());
118+
}
111119
}
120+
} else {
121+
out.writeNamedWriteableList(getInternalAggregations());
112122
}
113123
}
114124

125+
/**
126+
* Merge a {@linkplain PipelineAggregator.PipelineTree} into this
127+
* aggregation result tree before serializing to a node older than
128+
* 7.8.0.
129+
*/
130+
public void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
131+
getInternalAggregations().stream().forEach(agg -> {
132+
agg.mergePipelineTreeForBWCSerialization(pipelineTree.subTree(agg.getName()));
133+
});
134+
}
135+
115136
/**
116137
* Make a mutable copy of the aggregation results.
117138
* <p>
@@ -135,6 +156,14 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
135156
.collect(toList());
136157
}
137158

159+
/**
160+
* Get the transient pipeline tree used to serialize pipeline aggregators to old nodes.
161+
*/
162+
@Deprecated
163+
Supplier<PipelineAggregator.PipelineTree> getPipelineTreeForBwcSerialization() {
164+
return pipelineTreeForBwcSerialization;
165+
}
166+
138167
@SuppressWarnings("unchecked")
139168
private List<InternalAggregation> getInternalAggregations() {
140169
return (List<InternalAggregation>) aggregations;

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.ArrayList;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.function.Consumer;
3334
import java.util.function.Function;
3435

3536
public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation,
@@ -145,7 +146,7 @@ public static int countInnerBucket(Aggregation agg) {
145146
}
146147

147148
/**
148-
* Amulti-bucket agg needs to first reduce the buckets and *their* pipelines
149+
* A multi-bucket agg needs to first reduce the buckets and *their* pipelines
149150
* before allowing sibling pipelines to materialize.
150151
*/
151152
@Override
@@ -173,6 +174,13 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
173174
return modified ? create(newBuckets) : this;
174175
}
175176

177+
@Override
178+
public void forEachBucket(Consumer<InternalAggregations> consumer) {
179+
for (B bucket : getBuckets()) {
180+
consumer.accept((InternalAggregations) bucket.getAggregations());
181+
}
182+
}
183+
176184
private List<B> reducePipelineBuckets(ReduceContext reduceContext, PipelineTree pipelineTree) {
177185
List<B> reducedBuckets = new ArrayList<>();
178186
for (B bucket : getBuckets()) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.function.Consumer;
3738
import java.util.function.Function;
3839

3940
/**
@@ -181,6 +182,11 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
181182
return create(rewritten);
182183
}
183184

185+
@Override
186+
public void forEachBucket(Consumer<InternalAggregations> consumer) {
187+
consumer.accept(aggregations);
188+
}
189+
184190
@Override
185191
public boolean equals(Object obj) {
186192
if (this == obj) return true;

server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() {
8888
}
8989

9090
public static InternalAggregations createTestInstance() throws Exception {
91+
return createTestInstance(randomPipelineTree());
92+
}
93+
94+
public static InternalAggregations createTestInstance(PipelineAggregator.PipelineTree pipelineTree) throws Exception {
9195
List<InternalAggregation> aggsList = new ArrayList<>();
9296
if (randomBoolean()) {
9397
StringTermsTests stringTermsTests = new StringTermsTests();
@@ -104,7 +108,7 @@ public static InternalAggregations createTestInstance() throws Exception {
104108
InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests();
105109
aggsList.add(simpleValueTests.createTestInstance());
106110
}
107-
return new InternalAggregations(aggsList);
111+
return new InternalAggregations(aggsList, () -> pipelineTree);
108112
}
109113

110114
private static PipelineAggregator.PipelineTree randomPipelineTree() {
@@ -129,11 +133,9 @@ public void testSerialization() throws Exception {
129133
}
130134

131135
public void testGetTopLevelPipelineAggregators() throws Exception {
132-
InternalAggregations orig = createTestInstance();
133-
PipelineAggregator.PipelineTree tree = randomPipelineTree();
134-
InternalAggregations withPipelines = new InternalAggregations(orig.copyResults(), () -> tree);
135-
assertThat(withPipelines.aggregations, equalTo(orig.aggregations));
136-
assertThat(withPipelines.getTopLevelPipelineAggregators(), equalTo(tree.aggregators()));
136+
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree();
137+
InternalAggregations aggs = createTestInstance(pipelineTree);
138+
assertThat(aggs.getTopLevelPipelineAggregators(), equalTo(pipelineTree.aggregators()));
137139
}
138140

139141
private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException {
@@ -146,8 +148,14 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration
146148
InternalAggregations deserialized = new InternalAggregations(in);
147149
assertEquals(aggregations.aggregations, deserialized.aggregations);
148150
if (iteration < 2) {
149-
//serialize this enough times to make sure that we are able to write again what we read
150-
writeToAndReadFrom(deserialized, iteration + 1);
151+
/*
152+
* Add the pipeline tree for bwc serialization just like we
153+
* do when we merge the aggregation. Without that we can't
154+
* properly serialize to older versions.
155+
*/
156+
InternalAggregations asThoughReduced = new InternalAggregations(
157+
deserialized.copyResults(), aggregations.getPipelineTreeForBwcSerialization());
158+
writeToAndReadFrom(asThoughReduced, iteration + 1);
151159
}
152160
}
153161
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public void testStreamResponse() throws Exception {
102102
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
103103
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
104104
out.setVersion(version);
105+
if (version.before(Version.V_7_8_0)) {
106+
sigTerms.mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree.EMPTY);
107+
}
105108
out.writeNamedWriteable(sigTerms);
106109

107110
// read

test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,12 @@
126126
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
127127
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
128128
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
129+
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
129130
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
130131
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
131132
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
132133
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
134+
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
133135
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
134136
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;
135137
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
@@ -140,6 +142,7 @@
140142
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
141143
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
142144
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
145+
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
143146

144147
import java.io.IOException;
145148
import java.util.ArrayList;
@@ -153,6 +156,7 @@
153156
import java.util.stream.Collectors;
154157

155158
import static java.util.Collections.emptyList;
159+
import static java.util.Collections.emptyMap;
156160
import static java.util.Collections.singletonMap;
157161
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
158162
import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket;
@@ -384,6 +388,52 @@ public final void testFromXContentWithRandomFields() throws IOException {
384388
assertFromXContent(aggregation, parsedAggregation);
385389
}
386390

391+
public void testMergePipelineTreeForBWCSerialization() {
392+
T agg = createTestInstance();
393+
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg);
394+
agg.mergePipelineTreeForBWCSerialization(pipelineTree);
395+
assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree);
396+
}
397+
398+
public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) {
399+
Map<String, PipelineTree> subTree = new HashMap<>();
400+
aggregation.forEachBucket(bucketAggs -> {
401+
for (Aggregation subAgg : bucketAggs) {
402+
if (subTree.containsKey(subAgg.getName())) {
403+
continue;
404+
}
405+
subTree.put(subAgg.getName(), randomPipelineTree((InternalAggregation) subAgg));
406+
}
407+
});
408+
return new PipelineAggregator.PipelineTree(emptyMap(), randomPipelineAggregators());
409+
}
410+
411+
public static List<PipelineAggregator> randomPipelineAggregators() {
412+
List<PipelineAggregator> pipelines = new ArrayList<>();
413+
if (randomBoolean()) {
414+
if (randomBoolean()) {
415+
pipelines.add(new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create());
416+
}
417+
if (randomBoolean()) {
418+
pipelines.add(new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create());
419+
}
420+
if (randomBoolean()) {
421+
pipelines.add(new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
422+
}
423+
}
424+
return pipelines;
425+
}
426+
427+
@SuppressWarnings("deprecation")
428+
private void assertMergedPipelineTreeForBWCSerialization(InternalAggregation agg, PipelineAggregator.PipelineTree pipelineTree) {
429+
assertThat(agg.pipelineAggregatorsForBwcSerialization(), equalTo(pipelineTree.aggregators()));
430+
agg.forEachBucket(bucketAggs -> {
431+
for (Aggregation subAgg : bucketAggs) {
432+
assertMergedPipelineTreeForBWCSerialization((InternalAggregation) subAgg, pipelineTree.subTree(subAgg.getName()));
433+
}
434+
});
435+
}
436+
387437
protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;
388438

389439
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)