Skip to content

Commit 6b0beb7

Browse files
authored
[ML] make ML stats APIs cancellable (#88030)
Many machine learning stats APIs make multiple searches per call. Making them cancellable allows for those searches to be cancelled if the HTTP connection is closed. This improves scalability and performance. Relates #88010
1 parent aa7f8cb commit 6b0beb7

30 files changed

+400
-237
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.common.io.stream.Writeable;
2020
import org.elasticsearch.core.Nullable;
21+
import org.elasticsearch.tasks.CancellableTask;
2122
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.tasks.TaskId;
2224
import org.elasticsearch.xcontent.ParseField;
2325
import org.elasticsearch.xcontent.ToXContentObject;
2426
import org.elasticsearch.xcontent.XContentBuilder;
@@ -39,6 +41,8 @@
3941
import java.util.Map;
4042
import java.util.Objects;
4143

44+
import static org.elasticsearch.core.Strings.format;
45+
4246
public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAnalyticsStatsAction.Response> {
4347

4448
public static final GetDataFrameAnalyticsStatsAction INSTANCE = new GetDataFrameAnalyticsStatsAction();
@@ -141,6 +145,11 @@ public boolean equals(Object obj) {
141145
Request other = (Request) obj;
142146
return Objects.equals(id, other.id) && allowNoMatch == other.allowNoMatch && Objects.equals(pageParams, other.pageParams);
143147
}
148+
149+
@Override
150+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
151+
return new CancellableTask(id, type, action, format("get_data_frame_analytics_stats[%s]", id), parentTaskId, headers);
152+
}
144153
}
145154

146155
public static class Response extends BaseTasksResponse implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.tasks.CancellableTask;
1819
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskId;
1921
import org.elasticsearch.xcontent.ToXContentObject;
2022
import org.elasticsearch.xcontent.XContentBuilder;
2123
import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -29,6 +31,8 @@
2931
import java.util.Set;
3032
import java.util.stream.Collectors;
3133

34+
import static org.elasticsearch.core.Strings.format;
35+
3236
/**
3337
* Internal only action to get the current running state of a datafeed
3438
*/
@@ -68,6 +72,11 @@ public Set<String> getDatafeedTaskIds() {
6872
public boolean match(Task task) {
6973
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
7074
}
75+
76+
@Override
77+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
78+
return new CancellableTask(id, type, action, format("get_datafeed_running_state[%s]", datafeedTaskIds), parentTaskId, headers);
79+
}
7180
}
7281

7382
public static class Response extends BaseTasksResponse {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.core.Nullable;
1818
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
19+
import org.elasticsearch.tasks.CancellableTask;
20+
import org.elasticsearch.tasks.Task;
21+
import org.elasticsearch.tasks.TaskId;
1922
import org.elasticsearch.xcontent.ToXContentObject;
2023
import org.elasticsearch.xcontent.XContentBuilder;
2124
import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
@@ -36,6 +39,8 @@
3639
import java.util.Objects;
3740
import java.util.stream.Collectors;
3841

42+
import static org.elasticsearch.core.Strings.format;
43+
3944
public class GetDatafeedsStatsAction extends ActionType<GetDatafeedsStatsAction.Response> {
4045

4146
public static final GetDatafeedsStatsAction INSTANCE = new GetDatafeedsStatsAction();
@@ -114,6 +119,11 @@ public boolean equals(Object obj) {
114119
Request other = (Request) obj;
115120
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoMatch, other.allowNoMatch);
116121
}
122+
123+
@Override
124+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
125+
return new CancellableTask(id, type, action, format("get_datafeed_stats[%s]", datafeedId), parentTaskId, headers);
126+
}
117127
}
118128

119129
public static class Response extends AbstractGetResourcesResponse<Response.DatafeedStats> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.tasks.CancellableTask;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskId;
1821
import org.elasticsearch.xcontent.ParseField;
1922
import org.elasticsearch.xcontent.ToXContentObject;
2023
import org.elasticsearch.xcontent.XContentBuilder;
@@ -28,6 +31,7 @@
2831
import java.util.Map;
2932
import java.util.Objects;
3033

34+
import static org.elasticsearch.core.Strings.format;
3135
import static org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction.Request.SNAPSHOT_ID;
3236

3337
public class GetJobModelSnapshotsUpgradeStatsAction extends ActionType<GetJobModelSnapshotsUpgradeStatsAction.Response> {
@@ -115,6 +119,18 @@ public boolean equals(Object obj) {
115119
&& Objects.equals(snapshotId, other.snapshotId)
116120
&& Objects.equals(allowNoMatch, other.allowNoMatch);
117121
}
122+
123+
@Override
124+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
125+
return new CancellableTask(
126+
id,
127+
type,
128+
action,
129+
format("get_job_model_snapshot_upgrade_stats[%s:%s]", jobId, snapshotId),
130+
parentTaskId,
131+
headers
132+
);
133+
}
118134
}
119135

120136
public static class Response extends AbstractGetResourcesResponse<Response.JobModelSnapshotUpgradeStats> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.elasticsearch.common.io.stream.Writeable;
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.tasks.CancellableTask;
2223
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.tasks.TaskId;
2325
import org.elasticsearch.xcontent.ToXContentObject;
2426
import org.elasticsearch.xcontent.XContentBuilder;
2527
import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -38,6 +40,8 @@
3840
import java.util.Map;
3941
import java.util.Objects;
4042

43+
import static org.elasticsearch.core.Strings.format;
44+
4145
public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response> {
4246

4347
public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction();
@@ -134,6 +138,11 @@ public boolean equals(Object obj) {
134138
&& Objects.equals(allowNoMatch, other.allowNoMatch)
135139
&& Objects.equals(getTimeout(), other.getTimeout());
136140
}
141+
142+
@Override
143+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
144+
return new CancellableTask(id, type, action, format("get_job_stats[%s]", id), parentTaskId, headers);
145+
}
137146
}
138147

139148
public static class Response extends BaseTasksResponse implements ToXContentObject {

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception {
248248
List<String> updateIndices = Collections.singletonList("a-different-index");
249249
update.setIndices(updateIndices);
250250

251-
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validateErrorFunction = (updatedConfig, listener) -> {
252-
new Thread(() -> listener.onFailure(new IllegalArgumentException("this is a bad update")), getTestName()).start();
253-
};
251+
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validateErrorFunction = (updatedConfig, listener) -> new Thread(
252+
() -> listener.onFailure(new IllegalArgumentException("this is a bad update")),
253+
getTestName()
254+
).start();
254255

255256
AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
256257
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
@@ -277,7 +278,7 @@ public void testAllowNoMatch() throws InterruptedException {
277278
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
278279

279280
blockingCall(
280-
actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, actionListener),
281+
actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, null, actionListener),
281282
datafeedIdsHolder,
282283
exceptionHolder
283284
);
@@ -289,7 +290,7 @@ public void testAllowNoMatch() throws InterruptedException {
289290

290291
exceptionHolder.set(null);
291292
blockingCall(
292-
actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false, actionListener),
293+
actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false, null, actionListener),
293294
datafeedIdsHolder,
294295
exceptionHolder
295296
);
@@ -298,7 +299,7 @@ public void testAllowNoMatch() throws InterruptedException {
298299

299300
AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>();
300301
blockingCall(
301-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", false, actionListener),
302+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", false, null, actionListener),
302303
datafeedsHolder,
303304
exceptionHolder
304305
);
@@ -310,7 +311,7 @@ public void testAllowNoMatch() throws InterruptedException {
310311

311312
exceptionHolder.set(null);
312313
blockingCall(
313-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener),
314+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, null, actionListener),
314315
datafeedsHolder,
315316
exceptionHolder
316317
);
@@ -329,53 +330,59 @@ public void testExpandDatafeeds() throws Exception {
329330

330331
// Test datafeed IDs only
331332
SortedSet<String> expandedIds = blockingCall(
332-
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, actionListener)
333+
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, null, actionListener)
333334
);
334335
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
335336

336-
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, null, false, actionListener));
337+
expandedIds = blockingCall(
338+
actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, null, false, null, actionListener)
339+
);
337340
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds);
338341

339-
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, actionListener));
342+
expandedIds = blockingCall(
343+
actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, null, actionListener)
344+
);
340345
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2")), expandedIds);
341346

342-
expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, actionListener));
347+
expandedIds = blockingCall(
348+
actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, null, actionListener)
349+
);
343350
assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds);
344351

345352
expandedIds = blockingCall(
346-
actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, null, false, actionListener)
353+
actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, null, false, null, actionListener)
347354
);
348355
assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds);
349356

350357
// Test full datafeed config
351358
List<DatafeedConfig.Builder> expandedDatafeedBuilders = blockingCall(
352-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)
359+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, null, actionListener)
353360
);
354361
List<DatafeedConfig> expandedDatafeeds = expandedDatafeedBuilders.stream()
355362
.map(DatafeedConfig.Builder::build)
356363
.collect(Collectors.toList());
357364
assertThat(expandedDatafeeds, containsInAnyOrder(foo1, foo2));
358365

359366
expandedDatafeedBuilders = blockingCall(
360-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*-1", true, actionListener)
367+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*-1", true, null, actionListener)
361368
);
362369
expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
363370
assertThat(expandedDatafeeds, containsInAnyOrder(foo1, bar1));
364371

365372
expandedDatafeedBuilders = blockingCall(
366-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar*", true, actionListener)
373+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar*", true, null, actionListener)
367374
);
368375
expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
369376
assertThat(expandedDatafeeds, containsInAnyOrder(bar1, bar2));
370377

371378
expandedDatafeedBuilders = blockingCall(
372-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("b*r-1", true, actionListener)
379+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("b*r-1", true, null, actionListener)
373380
);
374381
expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
375382
assertThat(expandedDatafeeds, containsInAnyOrder(bar1));
376383

377384
expandedDatafeedBuilders = blockingCall(
378-
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar-1,foo*", true, actionListener)
385+
actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar-1,foo*", true, null, actionListener)
379386
);
380387
expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
381388
assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2));
@@ -398,12 +405,12 @@ public void testExpandDatafeedsWithTaskData() throws Exception {
398405
AtomicReference<SortedSet<String>> datafeedIdsHolder = new AtomicReference<>();
399406
// Test datafeed IDs only
400407
SortedSet<String> expandedIds = blockingCall(
401-
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, actionListener)
408+
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, null, actionListener)
402409
);
403410
assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
404411

405412
blockingCall(
406-
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, actionListener),
413+
actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, null, actionListener),
407414
datafeedIdsHolder,
408415
exceptionHolder
409416
);

0 commit comments

Comments
 (0)