Skip to content

Commit 509c674

Browse files
authored
Move async task maintenance service to core plugin (#57700)
The async task task maintenance service is used by both async search plugin as well as EQL plugin. So it needs to reside in the core. Relates to #49638
1 parent 8072646 commit 509c674

File tree

10 files changed

+216
-95
lines changed

10 files changed

+216
-95
lines changed

x-pack/plugin/async-search/build.gradle

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,25 @@ archivesBaseName = 'x-pack-async-search'
1313
compileJava.options.compilerArgs << "-Xlint:-rawtypes"
1414
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"
1515

16+
/*
17+
* Licensed to Elasticsearch under one or more contributor
18+
* license agreements. See the NOTICE file distributed with
19+
* this work for additional information regarding copyright
20+
* ownership. Elasticsearch licenses this file to you under
21+
* the Apache License, Version 2.0 (the "License"); you may
22+
* not use this file except in compliance with the License.
23+
* You may obtain a copy of the License at
24+
*
25+
* http://www.apache.org/licenses/LICENSE-2.0
26+
*
27+
* Unless required by applicable law or agreed to in writing,
28+
* software distributed under the License is distributed on an
29+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
30+
* KIND, either express or implied. See the License for the
31+
* specific language governing permissions and limitations
32+
* under the License.
33+
*/
34+
1635
// add all sub-projects of the qa sub-project
1736
gradle.projectsEvaluated {
1837
project.subprojects
@@ -28,6 +47,7 @@ dependencies {
2847
compileOnly project(path: xpackModule('core'), configuration: 'default')
2948
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
3049
testCompile project(path: xpackModule('ilm'))
50+
testCompile project(path: xpackModule('async'))
3151
}
3252

3353
dependencyLicenses {

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,49 +7,28 @@
77

88
import org.elasticsearch.action.ActionRequest;
99
import org.elasticsearch.action.ActionResponse;
10-
import org.elasticsearch.client.Client;
1110
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
12-
import org.elasticsearch.cluster.node.DiscoveryNode;
1311
import org.elasticsearch.cluster.node.DiscoveryNodes;
14-
import org.elasticsearch.cluster.service.ClusterService;
15-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1612
import org.elasticsearch.common.settings.ClusterSettings;
1713
import org.elasticsearch.common.settings.IndexScopedSettings;
1814
import org.elasticsearch.common.settings.Setting;
1915
import org.elasticsearch.common.settings.Settings;
2016
import org.elasticsearch.common.settings.SettingsFilter;
21-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
22-
import org.elasticsearch.env.Environment;
23-
import org.elasticsearch.env.NodeEnvironment;
2417
import org.elasticsearch.plugins.ActionPlugin;
2518
import org.elasticsearch.plugins.Plugin;
26-
import org.elasticsearch.repositories.RepositoriesService;
2719
import org.elasticsearch.rest.RestController;
2820
import org.elasticsearch.rest.RestHandler;
29-
import org.elasticsearch.script.ScriptService;
30-
import org.elasticsearch.threadpool.ThreadPool;
31-
import org.elasticsearch.watcher.ResourceWatcherService;
32-
import org.elasticsearch.xpack.core.XPackPlugin;
33-
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
34-
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
3521
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
3622
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
3723

3824
import java.util.Arrays;
39-
import java.util.Collection;
4025
import java.util.Collections;
4126
import java.util.List;
4227
import java.util.function.Supplier;
4328

44-
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
45-
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
29+
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
4630

4731
public final class AsyncSearch extends Plugin implements ActionPlugin {
48-
private final Settings settings;
49-
50-
public AsyncSearch(Settings settings) {
51-
this.settings = settings;
52-
}
5332

5433
@Override
5534
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
@@ -71,31 +50,6 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
7150
);
7251
}
7352

74-
@Override
75-
public Collection<Object> createComponents(Client client,
76-
ClusterService clusterService,
77-
ThreadPool threadPool,
78-
ResourceWatcherService resourceWatcherService,
79-
ScriptService scriptService,
80-
NamedXContentRegistry xContentRegistry,
81-
Environment environment,
82-
NodeEnvironment nodeEnvironment,
83-
NamedWriteableRegistry namedWriteableRegistry,
84-
IndexNameExpressionResolver indexNameExpressionResolver,
85-
Supplier<RepositoriesService> repositoriesServiceSupplier) {
86-
if (DiscoveryNode.isDataNode(environment.settings())) {
87-
// only data nodes should be eligible to run the maintenance service.
88-
AsyncTaskIndexService<AsyncSearchResponse> indexService =
89-
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
90-
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
91-
AsyncSearchMaintenanceService maintenanceService =
92-
new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
93-
return Collections.singletonList(maintenanceService);
94-
} else {
95-
return Collections.emptyList();
96-
}
97-
}
98-
9953
@Override
10054
public List<Setting<?>> getSettings() {
10155
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1313
import org.elasticsearch.action.get.GetResponse;
1414
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
1516
import org.elasticsearch.cluster.ClusterChangedEvent;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -30,10 +31,11 @@
3031
import org.elasticsearch.test.InternalTestCluster;
3132
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3233
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
34+
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
35+
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
3336
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3437
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3538
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
36-
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
3739
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
3840
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
3941
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
@@ -50,7 +52,7 @@
5052
import java.util.concurrent.ExecutionException;
5153

5254
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
53-
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
55+
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
5456
import static org.hamcrest.Matchers.equalTo;
5557
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5658

@@ -79,7 +81,7 @@ public List<AggregationSpec> getAggregations() {
7981

8082
@Before
8183
public void startMaintenanceService() {
82-
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
84+
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
8385
if (service.lifecycleState() == Lifecycle.State.STOPPED) {
8486
// force the service to start again
8587
service.start();
@@ -91,7 +93,7 @@ public void startMaintenanceService() {
9193

9294
@After
9395
public void stopMaintenanceService() {
94-
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
96+
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
9597
service.stop();
9698
}
9799
}
@@ -103,7 +105,7 @@ public void releaseQueryLatch() {
103105

104106
@Override
105107
protected Collection<Class<? extends Plugin>> nodePlugins() {
106-
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, IndexLifecycle.class,
108+
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, AsyncResultsIndexPlugin.class, IndexLifecycle.class,
107109
SearchTestPlugin.class, ReindexPlugin.class);
108110
}
109111

x-pack/plugin/async/build.gradle

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
evaluationDependsOn(xpackModule('core'))
21+
22+
apply plugin: 'elasticsearch.esplugin'
23+
24+
esplugin {
25+
name 'x-pack-async'
26+
description 'A module which handles common async operations'
27+
classname 'org.elasticsearch.xpack.async.AsyncResultsIndexPlugin'
28+
extendedPlugins = ['x-pack-core']
29+
}
30+
archivesBaseName = 'x-pack-async'
31+
32+
dependencies {
33+
compileOnly project(":server")
34+
compileOnly project(path: xpackModule('core'), configuration: 'default')
35+
}
36+
37+
dependencyLicenses {
38+
ignoreSha 'x-pack-core'
39+
}
40+
41+
integTest.enabled = false
42+
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.async;
8+
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
11+
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
16+
import org.elasticsearch.env.Environment;
17+
import org.elasticsearch.env.NodeEnvironment;
18+
import org.elasticsearch.indices.SystemIndexDescriptor;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.plugins.SystemIndexPlugin;
21+
import org.elasticsearch.repositories.RepositoriesService;
22+
import org.elasticsearch.script.ScriptService;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.elasticsearch.watcher.ResourceWatcherService;
25+
import org.elasticsearch.xpack.core.XPackPlugin;
26+
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
27+
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
28+
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
29+
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.function.Supplier;
35+
36+
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
37+
38+
public class AsyncResultsIndexPlugin extends Plugin implements SystemIndexPlugin {
39+
40+
protected final Settings settings;
41+
42+
public AsyncResultsIndexPlugin(Settings settings) {
43+
this.settings = settings;
44+
}
45+
46+
@Override
47+
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
48+
return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName()));
49+
}
50+
51+
@Override
52+
public Collection<Object> createComponents(
53+
Client client,
54+
ClusterService clusterService,
55+
ThreadPool threadPool,
56+
ResourceWatcherService resourceWatcherService,
57+
ScriptService scriptService,
58+
NamedXContentRegistry xContentRegistry,
59+
Environment environment,
60+
NodeEnvironment nodeEnvironment,
61+
NamedWriteableRegistry namedWriteableRegistry,
62+
IndexNameExpressionResolver indexNameExpressionResolver,
63+
Supplier<RepositoriesService> repositoriesServiceSupplier
64+
) {
65+
List<Object> components = new ArrayList<>();
66+
if (DiscoveryNode.isDataNode(environment.settings())) {
67+
// only data nodes should be eligible to run the maintenance service.
68+
AsyncTaskIndexService<AsyncSearchResponse> indexService = new AsyncTaskIndexService<>(
69+
XPackPlugin.ASYNC_RESULTS_INDEX,
70+
clusterService,
71+
threadPool.getThreadContext(),
72+
client,
73+
ASYNC_SEARCH_ORIGIN,
74+
AsyncSearchResponse::new,
75+
namedWriteableRegistry
76+
);
77+
AsyncTaskMaintenanceService maintenanceService = new AsyncTaskMaintenanceService(
78+
clusterService,
79+
nodeEnvironment.nodeId(),
80+
settings,
81+
threadPool,
82+
indexService
83+
);
84+
components.add(maintenanceService);
85+
}
86+
return components;
87+
}
88+
}

x-pack/plugin/async/src/main/plugin-metadata/plugin-security.policy

Whitespace-only changes.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.async;
8+
9+
import org.elasticsearch.common.settings.Settings;
10+
import org.elasticsearch.test.ESTestCase;
11+
import org.hamcrest.Matchers;
12+
13+
public class AsyncResultsIndexPluginTests extends ESTestCase {
14+
15+
public void testDummy() {
16+
// This is a dummy test case to satisfy the conventions
17+
AsyncResultsIndexPlugin plugin = new AsyncResultsIndexPlugin(Settings.EMPTY);
18+
assertThat(plugin.getSystemIndexDescriptors(Settings.EMPTY), Matchers.hasSize(1));
19+
}
20+
}

0 commit comments

Comments
 (0)