Skip to content

Commit d4ec21b

Browse files
author
Hendrik Muhs
authored
[ML-DataFrame] Rewrite continuous logic to prevent terms count limit (#44219)
Rewrites how continuous data frame transforms calculates and handles buckets that require an update. Instead of storing the whole set in memory, it pages through the updates using a 2nd cursor. This lowers memory consumption and prevents problems with limits at query time (max_terms_count). The list of updates can be re-retrieved in a failure case (#43662)
1 parent 0b6676a commit d4ec21b

File tree

19 files changed

+908
-279
lines changed

19 files changed

+908
-279
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ task verifyVersions {
160160
* after the backport of the backcompat code is complete.
161161
*/
162162

163-
boolean bwc_tests_enabled = true
164-
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
163+
boolean bwc_tests_enabled = false
164+
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/44219" /* place a PR link here when committing bwc changes */
165165
if (bwc_tests_enabled == false) {
166166
if (bwc_tests_disabled_issue.isEmpty()) {
167167
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.dataframe.transforms;
21+
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
25+
import org.elasticsearch.common.xcontent.XContentParser;
26+
27+
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
32+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
33+
34+
/**
35+
* Holds state of the cursors:
36+
*
37+
* indexer_position: the position of the indexer querying the source
38+
* bucket_position: the position used for identifying changes
39+
*/
40+
public class DataFrameIndexerPosition {
41+
public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
42+
public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");
43+
44+
private final Map<String, Object> indexerPosition;
45+
private final Map<String, Object> bucketPosition;
46+
47+
@SuppressWarnings("unchecked")
48+
public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(
49+
"data_frame_indexer_position",
50+
true,
51+
args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));
52+
53+
static {
54+
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
55+
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
56+
}
57+
58+
public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
59+
this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
60+
this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
61+
}
62+
63+
public Map<String, Object> getIndexerPosition() {
64+
return indexerPosition;
65+
}
66+
67+
public Map<String, Object> getBucketsPosition() {
68+
return bucketPosition;
69+
}
70+
71+
@Override
72+
public boolean equals(Object other) {
73+
if (this == other) {
74+
return true;
75+
}
76+
77+
if (other == null || getClass() != other.getClass()) {
78+
return false;
79+
}
80+
81+
DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;
82+
83+
return Objects.equals(this.indexerPosition, that.indexerPosition) &&
84+
Objects.equals(this.bucketPosition, that.bucketPosition);
85+
}
86+
87+
@Override
88+
public int hashCode() {
89+
return Objects.hash(indexerPosition, bucketPosition);
90+
}
91+
92+
public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
93+
try {
94+
return PARSER.parse(parser, null);
95+
} catch (IOException e) {
96+
throw new RuntimeException(e);
97+
}
98+
}
99+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.elasticsearch.common.xcontent.XContentParser;
2828

2929
import java.io.IOException;
30-
import java.util.Collections;
31-
import java.util.LinkedHashMap;
3230
import java.util.Map;
3331
import java.util.Objects;
3432

@@ -39,7 +37,10 @@ public class DataFrameTransformState {
3937

4038
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
4139
private static final ParseField TASK_STATE = new ParseField("task_state");
40+
41+
// 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position.
4242
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
43+
private static final ParseField POSITION = new ParseField("position");
4344
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
4445
private static final ParseField REASON = new ParseField("reason");
4546
private static final ParseField PROGRESS = new ParseField("progress");
@@ -48,18 +49,31 @@ public class DataFrameTransformState {
4849
@SuppressWarnings("unchecked")
4950
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
5051
new ConstructingObjectParser<>("data_frame_transform_state", true,
51-
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
52-
(IndexerState) args[1],
53-
(Map<String, Object>) args[2],
54-
(long) args[3],
55-
(String) args[4],
56-
(DataFrameTransformProgress) args[5],
57-
(NodeAttributes) args[6]));
52+
args -> {
53+
DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
54+
IndexerState indexerState = (IndexerState) args[1];
55+
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
56+
DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];
57+
58+
// BWC handling, translate current_position to position iff position isn't set
59+
if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
60+
dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
61+
}
62+
63+
long checkpoint = (long) args[4];
64+
String reason = (String) args[5];
65+
DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
66+
NodeAttributes node = (NodeAttributes) args[7];
67+
68+
return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress,
69+
node);
70+
});
5871

5972
static {
6073
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
6174
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
6275
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
76+
PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
6377
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
6478
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
6579
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
@@ -73,21 +87,21 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws
7387
private final DataFrameTransformTaskState taskState;
7488
private final IndexerState indexerState;
7589
private final long checkpoint;
76-
private final Map<String, Object> currentPosition;
90+
private final DataFrameIndexerPosition position;
7791
private final String reason;
7892
private final DataFrameTransformProgress progress;
7993
private final NodeAttributes node;
8094

8195
public DataFrameTransformState(DataFrameTransformTaskState taskState,
8296
IndexerState indexerState,
83-
@Nullable Map<String, Object> position,
97+
@Nullable DataFrameIndexerPosition position,
8498
long checkpoint,
8599
@Nullable String reason,
86100
@Nullable DataFrameTransformProgress progress,
87101
@Nullable NodeAttributes node) {
88102
this.taskState = taskState;
89103
this.indexerState = indexerState;
90-
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
104+
this.position = position;
91105
this.checkpoint = checkpoint;
92106
this.reason = reason;
93107
this.progress = progress;
@@ -103,8 +117,8 @@ public DataFrameTransformTaskState getTaskState() {
103117
}
104118

105119
@Nullable
106-
public Map<String, Object> getPosition() {
107-
return currentPosition;
120+
public DataFrameIndexerPosition getPosition() {
121+
return position;
108122
}
109123

110124
public long getCheckpoint() {
@@ -140,7 +154,7 @@ public boolean equals(Object other) {
140154

141155
return Objects.equals(this.taskState, that.taskState) &&
142156
Objects.equals(this.indexerState, that.indexerState) &&
143-
Objects.equals(this.currentPosition, that.currentPosition) &&
157+
Objects.equals(this.position, that.position) &&
144158
Objects.equals(this.progress, that.progress) &&
145159
this.checkpoint == that.checkpoint &&
146160
Objects.equals(this.node, that.node) &&
@@ -149,7 +163,7 @@ public boolean equals(Object other) {
149163

150164
@Override
151165
public int hashCode() {
152-
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
166+
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
153167
}
154168

155169
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.dataframe.transforms;
21+
22+
import org.elasticsearch.common.xcontent.XContentBuilder;
23+
import org.elasticsearch.test.ESTestCase;
24+
25+
import java.io.IOException;
26+
import java.util.LinkedHashMap;
27+
import java.util.Map;
28+
29+
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
30+
31+
public class DataFrameIndexerPositionTests extends ESTestCase {
32+
33+
public void testFromXContent() throws IOException {
34+
xContentTester(this::createParser,
35+
DataFrameIndexerPositionTests::randomDataFrameIndexerPosition,
36+
DataFrameIndexerPositionTests::toXContent,
37+
DataFrameIndexerPosition::fromXContent)
38+
.supportsUnknownFields(true)
39+
.randomFieldsExcludeFilter(field -> field.equals("indexer_position") ||
40+
field.equals("bucket_position"))
41+
.test();
42+
}
43+
44+
public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
45+
return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
46+
}
47+
48+
public static void toXContent(DataFrameIndexerPosition position, XContentBuilder builder) throws IOException {
49+
builder.startObject();
50+
if (position.getIndexerPosition() != null) {
51+
builder.field("indexer_position", position.getIndexerPosition());
52+
}
53+
if (position.getBucketsPosition() != null) {
54+
builder.field("bucket_position", position.getBucketsPosition());
55+
}
56+
builder.endObject();
57+
}
58+
59+
private static Map<String, Object> randomPositionMap() {
60+
if (randomBoolean()) {
61+
return null;
62+
}
63+
int numFields = randomIntBetween(1, 5);
64+
Map<String, Object> position = new LinkedHashMap<>();
65+
for (int i = 0; i < numFields; i++) {
66+
Object value;
67+
if (randomBoolean()) {
68+
value = randomLong();
69+
} else {
70+
value = randomAlphaOfLengthBetween(1, 10);
71+
}
72+
position.put(randomAlphaOfLengthBetween(3, 10), value);
73+
}
74+
return position;
75+
}
76+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.elasticsearch.test.ESTestCase;
2626

2727
import java.io.IOException;
28-
import java.util.LinkedHashMap;
29-
import java.util.Map;
3028

3129
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
3230

@@ -38,15 +36,16 @@ public void testFromXContent() throws IOException {
3836
DataFrameTransformStateTests::toXContent,
3937
DataFrameTransformState::fromXContent)
4038
.supportsUnknownFields(true)
41-
.randomFieldsExcludeFilter(field -> field.equals("current_position") ||
42-
field.equals("node.attributes"))
39+
.randomFieldsExcludeFilter(field -> field.equals("position.indexer_position") ||
40+
field.equals("position.bucket_position") ||
41+
field.equals("node.attributes"))
4342
.test();
4443
}
4544

4645
public static DataFrameTransformState randomDataFrameTransformState() {
4746
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
4847
randomFrom(IndexerState.values()),
49-
randomPositionMap(),
48+
randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
5049
randomLongBetween(0,10),
5150
randomBoolean() ? null : randomAlphaOfLength(10),
5251
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
@@ -58,7 +57,8 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
5857
builder.field("task_state", state.getTaskState().value());
5958
builder.field("indexer_state", state.getIndexerState().value());
6059
if (state.getPosition() != null) {
61-
builder.field("current_position", state.getPosition());
60+
builder.field("position");
61+
DataFrameIndexerPositionTests.toXContent(state.getPosition(), builder);
6262
}
6363
builder.field("checkpoint", state.getCheckpoint());
6464
if (state.getReason() != null) {
@@ -75,21 +75,4 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
7575
builder.endObject();
7676
}
7777

78-
private static Map<String, Object> randomPositionMap() {
79-
if (randomBoolean()) {
80-
return null;
81-
}
82-
int numFields = randomIntBetween(1, 5);
83-
Map<String, Object> position = new LinkedHashMap<>();
84-
for (int i = 0; i < numFields; i++) {
85-
Object value;
86-
if (randomBoolean()) {
87-
value = randomLong();
88-
} else {
89-
value = randomAlphaOfLengthBetween(1, 10);
90-
}
91-
position.put(randomAlphaOfLengthBetween(3, 10), value);
92-
}
93-
return position;
94-
}
9578
}

0 commit comments

Comments
 (0)