Skip to content

Commit 4772b5d

Browse files
authored
[ML] Addressing bug streaming DatafeedConfig aggs from (<= 6.5.4) -> 6.7.0 (#40659)
* [ML] Addressing bug streaming DatafeedConfig aggs from (<= 6.5.4) -> 6.7.0 (#40610) * Addressing stream failure and adding tests to catch such in the future * Add aggs to full cluster restart tests * Test BWC for datafeeds with and without aggs The wire serialisation is different for null/non-null aggs, so it's worth testing both cases. * Fixing bwc test, removing types * Fixing BWC test for datafeed * Update 40_ml_datafeed_crud.yml * Update build.gradle
1 parent 2c770ba commit 4772b5d

File tree

7 files changed

+334
-41
lines changed

7 files changed

+334
-41
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ static AggProvider fromStream(StreamInput in) throws IOException {
7373
} else if (in.getVersion().onOrAfter(Version.V_6_6_0)) { // Has the bug, but supports lazy objects
7474
return new AggProvider(in.readMap(), null, null);
7575
} else { // only supports eagerly parsed objects
76-
return AggProvider.fromParsedAggs(in.readOptionalWriteable(AggregatorFactories.Builder::new));
76+
// Upstream, we have read the bool already and know for sure that we have parsed aggs in the stream
77+
return AggProvider.fromParsedAggs(new AggregatorFactories.Builder(in));
7778
}
7879
}
7980

@@ -111,7 +112,8 @@ public void writeTo(StreamOutput out) throws IOException {
111112
// actually are aggregations defined
112113
throw new ElasticsearchException("Unsupported operation: parsed aggregations are null");
113114
}
114-
out.writeOptionalWriteable(parsedAggs);
115+
// Upstream we already verified that this calling object is not null, no need to write a second boolean to the stream
116+
parsedAggs.writeTo(out);
115117
}
116118
}
117119

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ public DatafeedConfig(StreamInput in) throws IOException {
212212
}
213213
// each of these writables are version aware
214214
this.queryProvider = QueryProvider.fromStream(in);
215+
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
215216
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
216217

217218
if (in.readBoolean()) {
@@ -420,6 +421,7 @@ public void writeTo(StreamOutput out) throws IOException {
420421

421422
// Each of these writables are version aware
422423
queryProvider.writeTo(out); // never null
424+
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
423425
out.writeOptionalWriteable(aggProvider);
424426

425427
if (scriptFields != null) {

x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
import org.elasticsearch.common.unit.TimeValue;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
1515
import org.elasticsearch.common.xcontent.support.XContentMapValues;
16+
import org.elasticsearch.search.aggregations.AggregationBuilders;
17+
import org.elasticsearch.search.aggregations.AggregatorFactories;
18+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
19+
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
1620
import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
1721
import org.elasticsearch.xpack.core.ml.MlTasks;
1822
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -125,6 +129,7 @@ private void oldClusterTests() throws IOException {
125129
dfBuilder.setDelayedDataCheckConfig(null);
126130
}
127131
dfBuilder.setIndices(Collections.singletonList("airline-data"));
132+
addAggregations(dfBuilder);
128133

129134
Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
130135
putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
@@ -258,4 +263,11 @@ private void assertJobNotPresent(String jobId, List<Map<String, Object>> jobs) {
258263
.filter(id -> id.equals(jobId)).findFirst();
259264
assertFalse(config.isPresent());
260265
}
266+
267+
private void addAggregations(DatafeedConfig.Builder dfBuilder) {
268+
TermsAggregationBuilder airline = AggregationBuilders.terms("airline");
269+
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time").subAggregation(airline);
270+
dfBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator(
271+
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
272+
}
261273
}

x-pack/qa/rolling-upgrade/build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,10 @@ for (Version version : bwcVersions.wireCompatible) {
221221
systemProperty 'tests.first_round', 'true'
222222
// We only need to run these tests once so we may as well do it when we're two thirds upgraded
223223
systemProperty 'tests.rest.blacklist', [
224-
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
225-
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
226-
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster',
224+
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
225+
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
226+
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
227+
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster'
227228
].join(',')
228229
finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
229230
}

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml

Lines changed: 104 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,44 @@
11
---
2-
"Test old cluster datafeed":
2+
"Test old cluster datafeed without aggs":
33
- do:
44
ml.get_datafeeds:
5-
datafeed_id: old-cluster-datafeed
6-
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"}
5+
datafeed_id: old-cluster-datafeed-without-aggs
6+
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"}
77
- length: { datafeeds.0.indices: 1 }
88
- gte: { datafeeds.0.scroll_size: 2000 }
9+
- match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless }
10+
- is_false: datafeeds.0.aggregations
911

1012
- do:
1113
ml.get_datafeed_stats:
12-
datafeed_id: old-cluster-datafeed
14+
datafeed_id: old-cluster-datafeed-without-aggs
1315
- match: { datafeeds.0.state: "stopped"}
1416
- is_false: datafeeds.0.node
1517

1618
---
17-
"Put job and datafeed in mixed cluster":
19+
"Test old cluster datafeed with aggs":
20+
- do:
21+
ml.get_datafeeds:
22+
datafeed_id: old-cluster-datafeed-with-aggs
23+
- match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"}
24+
- length: { datafeeds.0.indices: 1 }
25+
- gte: { datafeeds.0.scroll_size: 2000 }
26+
- is_false: datafeeds.0.script_fields
27+
- match: { datafeeds.0.aggregations.buckets.date_histogram.field: time }
28+
- match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time }
29+
30+
- do:
31+
ml.get_datafeed_stats:
32+
datafeed_id: old-cluster-datafeed-with-aggs
33+
- match: { datafeeds.0.state: "stopped"}
34+
- is_false: datafeeds.0.node
35+
36+
---
37+
"Put job and datafeed without aggs in mixed cluster":
1838

1939
- do:
2040
ml.put_job:
21-
job_id: mixed-cluster-datafeed-job
41+
job_id: mixed-cluster-datafeed-job-without-aggs
2242
body: >
2343
{
2444
"description":"Cluster upgrade",
@@ -37,16 +57,90 @@
3757
3858
- do:
3959
ml.put_datafeed:
40-
datafeed_id: mixed-cluster-datafeed
60+
datafeed_id: mixed-cluster-datafeed-without-aggs
4161
body: >
4262
{
43-
"job_id":"mixed-cluster-datafeed-job",
63+
"job_id":"mixed-cluster-datafeed-job-without-aggs",
4464
"indices":["airline-data"],
45-
"scroll_size": 2000
65+
"scroll_size": 2000,
66+
"script_fields": {
67+
"double_responsetime": {
68+
"script": {
69+
"lang": "painless",
70+
"source": "doc['responsetime'].value * 2"
71+
}
72+
}
73+
}
74+
}
75+
76+
- do:
77+
ml.get_datafeed_stats:
78+
datafeed_id: mixed-cluster-datafeed-without-aggs
79+
- match: { datafeeds.0.state: stopped}
80+
- is_false: datafeeds.0.node
81+
82+
---
83+
"Put job and datafeed with aggs in mixed cluster":
84+
85+
- do:
86+
ml.put_job:
87+
job_id: mixed-cluster-datafeed-job-with-aggs
88+
body: >
89+
{
90+
"description":"Cluster upgrade",
91+
"analysis_config" : {
92+
"bucket_span": "60s",
93+
"summary_count_field_name": "doc_count",
94+
"detectors" :[{"function":"count"}]
95+
},
96+
"analysis_limits" : {
97+
"model_memory_limit": "50mb"
98+
},
99+
"data_description" : {
100+
"format":"xcontent",
101+
"time_field":"time"
102+
}
103+
}
104+
105+
- do:
106+
ml.put_datafeed:
107+
datafeed_id: mixed-cluster-datafeed-with-aggs
108+
body: >
109+
{
110+
"job_id":"mixed-cluster-datafeed-job-with-aggs",
111+
"indices":["airline-data"],
112+
"scroll_size": 2000,
113+
"aggregations": {
114+
"buckets": {
115+
"date_histogram": {
116+
"field": "time",
117+
"interval": "30s",
118+
"time_zone": "UTC"
119+
},
120+
"aggregations": {
121+
"time": {
122+
"max": {"field": "time"}
123+
},
124+
"airline": {
125+
"terms": {
126+
"field": "airline",
127+
"size": 100
128+
},
129+
"aggregations": {
130+
"responsetime": {
131+
"avg": {
132+
"field": "responsetime"
133+
}
134+
}
135+
}
136+
}
137+
}
138+
}
139+
}
46140
}
47141
48142
- do:
49143
ml.get_datafeed_stats:
50-
datafeed_id: mixed-cluster-datafeed
144+
datafeed_id: mixed-cluster-datafeed-with-aggs
51145
- match: { datafeeds.0.state: stopped}
52146
- is_false: datafeeds.0.node

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
---
2-
"Put job and datafeed in old cluster":
2+
"Put job and datafeed without aggs in old cluster":
33

44
- do:
55
ml.put_job:
6-
job_id: old-cluster-datafeed-job
6+
job_id: old-cluster-datafeed-job-without-aggs
77
body: >
88
{
99
"description":"Cluster upgrade",
@@ -19,20 +19,95 @@
1919
"time_field":"time"
2020
}
2121
}
22-
- match: { job_id: old-cluster-datafeed-job }
22+
- match: { job_id: old-cluster-datafeed-job-without-aggs }
2323

2424
- do:
2525
ml.put_datafeed:
26-
datafeed_id: old-cluster-datafeed
26+
datafeed_id: old-cluster-datafeed-without-aggs
2727
body: >
2828
{
29-
"job_id":"old-cluster-datafeed-job",
29+
"job_id":"old-cluster-datafeed-job-without-aggs",
3030
"indices":["airline-data"],
31-
"scroll_size": 2000
31+
"scroll_size": 2000,
32+
"script_fields": {
33+
"double_responsetime": {
34+
"script": {
35+
"lang": "painless",
36+
"source": "doc['responsetime'].value * 2"
37+
}
38+
}
39+
}
40+
}
41+
42+
- do:
43+
ml.get_datafeed_stats:
44+
datafeed_id: old-cluster-datafeed-without-aggs
45+
- match: { datafeeds.0.state: stopped}
46+
- is_false: datafeeds.0.node
47+
48+
---
49+
"Put job and datafeed with aggs in old cluster":
50+
51+
- do:
52+
ml.put_job:
53+
job_id: old-cluster-datafeed-job-with-aggs
54+
body: >
55+
{
56+
"description":"Cluster upgrade",
57+
"analysis_config" : {
58+
"bucket_span": "60s",
59+
"summary_count_field_name": "doc_count",
60+
"detectors" :[{"function":"count"}]
61+
},
62+
"analysis_limits" : {
63+
"model_memory_limit": "50mb"
64+
},
65+
"data_description" : {
66+
"format":"xcontent",
67+
"time_field":"time"
68+
}
69+
}
70+
- match: { job_id: old-cluster-datafeed-job-with-aggs }
71+
72+
- do:
73+
ml.put_datafeed:
74+
datafeed_id: old-cluster-datafeed-with-aggs
75+
body: >
76+
{
77+
"job_id":"old-cluster-datafeed-job-with-aggs",
78+
"indices":["airline-data"],
79+
"scroll_size": 2000,
80+
"aggregations": {
81+
"buckets": {
82+
"date_histogram": {
83+
"field": "time",
84+
"interval": "30s",
85+
"time_zone": "UTC"
86+
},
87+
"aggregations": {
88+
"time": {
89+
"max": {"field": "time"}
90+
},
91+
"airline": {
92+
"terms": {
93+
"field": "airline",
94+
"size": 100
95+
},
96+
"aggregations": {
97+
"responsetime": {
98+
"avg": {
99+
"field": "responsetime"
100+
}
101+
}
102+
}
103+
}
104+
}
105+
}
106+
}
32107
}
33108
34109
- do:
35110
ml.get_datafeed_stats:
36-
datafeed_id: old-cluster-datafeed
111+
datafeed_id: old-cluster-datafeed-with-aggs
37112
- match: { datafeeds.0.state: stopped}
38113
- is_false: datafeeds.0.node

0 commit comments

Comments
 (0)