Skip to content

Commit cfafe3c

Browse files
committed
[ML] Change Datafeed actions to read config from the config index (#33273)
1 parent 2ec7a61 commit cfafe3c

File tree

14 files changed

+578
-187
lines changed

14 files changed

+578
-187
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.core.ml;
77

8-
import org.elasticsearch.ResourceAlreadyExistsException;
98
import org.elasticsearch.ResourceNotFoundException;
109
import org.elasticsearch.Version;
1110
import org.elasticsearch.cluster.AbstractDiffable;
@@ -295,7 +294,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
295294

296295
public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
297296
if (datafeeds.containsKey(datafeedConfig.getId())) {
298-
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
297+
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
299298
}
300299
String jobId = datafeedConfig.getJobId();
301300
checkJobIsAvailableForDatafeed(jobId);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ private void addOptionalField(XContentBuilder builder, ParseField field, Object
204204
}
205205
}
206206

207-
String getJobId() {
207+
public String getJobId() {
208208
return jobId;
209209
}
210210

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public final class Messages {
4141
public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]";
4242
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
4343
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
44+
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
4445

4546
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";
4647
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public static ResourceNotFoundException missingDatafeedException(String datafeed
3030
return new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
3131
}
3232

33+
public static ResourceAlreadyExistsException datafeedAlreadyExists(String datafeedId) {
34+
return new ResourceAlreadyExistsException(Messages.getMessage(Messages.DATAFEED_ID_ALREADY_TAKEN, datafeedId));
35+
}
36+
3337
public static ElasticsearchException serverError(String msg) {
3438
return new ElasticsearchException(msg);
3539
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,42 +11,48 @@
1111
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1212
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1313
import org.elasticsearch.client.Client;
14-
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1514
import org.elasticsearch.cluster.ClusterState;
1615
import org.elasticsearch.cluster.block.ClusterBlockException;
1716
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1817
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19-
import org.elasticsearch.cluster.metadata.MetaData;
2018
import org.elasticsearch.cluster.service.ClusterService;
2119
import org.elasticsearch.common.inject.Inject;
2220
import org.elasticsearch.common.settings.Settings;
2321
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2422
import org.elasticsearch.persistent.PersistentTasksService;
23+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2524
import org.elasticsearch.threadpool.ThreadPool;
2625
import org.elasticsearch.transport.TransportService;
27-
import org.elasticsearch.xpack.core.XPackPlugin;
28-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2926
import org.elasticsearch.xpack.core.ml.MlTasks;
3027
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
3128
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
29+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
30+
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
31+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
32+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3233

3334
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
3435
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
3536

3637
public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<DeleteDatafeedAction.Request, AcknowledgedResponse> {
3738

38-
private Client client;
39-
private PersistentTasksService persistentTasksService;
39+
private final Client client;
40+
private final DatafeedConfigProvider datafeedConfigProvider;
41+
private final ClusterService clusterService;
42+
private final PersistentTasksService persistentTasksService;
4043

4144
@Inject
4245
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
4346
ThreadPool threadPool, ActionFilters actionFilters,
4447
IndexNameExpressionResolver indexNameExpressionResolver,
45-
Client client, PersistentTasksService persistentTasksService) {
48+
Client client, PersistentTasksService persistentTasksService,
49+
NamedXContentRegistry xContentRegistry) {
4650
super(settings, DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
4751
indexNameExpressionResolver, DeleteDatafeedAction.Request::new);
4852
this.client = client;
53+
this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry);
4954
this.persistentTasksService = persistentTasksService;
55+
this.clusterService = clusterService;
5056
}
5157

5258
@Override
@@ -65,14 +71,14 @@ protected void masterOperation(DeleteDatafeedAction.Request request, ClusterStat
6571
if (request.isForce()) {
6672
forceDeleteDatafeed(request, state, listener);
6773
} else {
68-
deleteDatafeedFromMetadata(request, listener);
74+
deleteDatafeedConfig(request, listener);
6975
}
7076
}
7177

7278
private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
7379
ActionListener<AcknowledgedResponse> listener) {
7480
ActionListener<Boolean> finalListener = ActionListener.wrap(
75-
response -> deleteDatafeedFromMetadata(request, listener),
81+
response -> deleteDatafeedConfig(request, listener),
7682
listener::onFailure
7783
);
7884

@@ -111,28 +117,19 @@ public void onFailure(Exception e) {
111117
}
112118
}
113119

114-
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
115-
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(),
116-
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
117-
118-
@Override
119-
protected AcknowledgedResponse newResponse(boolean acknowledged) {
120-
return new AcknowledgedResponse(acknowledged);
121-
}
122-
123-
@Override
124-
public ClusterState execute(ClusterState currentState) {
125-
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
126-
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
127-
PersistentTasksCustomMetaData persistentTasks =
128-
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
129-
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
130-
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
131-
return ClusterState.builder(currentState).metaData(
132-
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
133-
.build();
134-
}
135-
});
120+
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
121+
// Check datafeed is stopped
122+
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
123+
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
124+
listener.onFailure(ExceptionsHelper.conflictStatusException(
125+
Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED)));
126+
return;
127+
}
128+
129+
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
130+
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
131+
listener::onFailure
132+
));
136133
}
137134

138135
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,46 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.support.ActionFilters;
1010
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
11+
import org.elasticsearch.client.Client;
1112
import org.elasticsearch.cluster.ClusterState;
1213
import org.elasticsearch.cluster.block.ClusterBlockException;
1314
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1415
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.inject.Inject;
1718
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1820
import org.elasticsearch.threadpool.ThreadPool;
1921
import org.elasticsearch.transport.TransportService;
2022
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
2123
import org.elasticsearch.xpack.core.ml.MlMetadata;
2224
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2325
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
26+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
2427

2528
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.Comparator;
31+
import java.util.HashMap;
2632
import java.util.List;
33+
import java.util.Map;
2734
import java.util.Set;
2835

2936
public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<GetDatafeedsAction.Request,
3037
GetDatafeedsAction.Response> {
3138

39+
private final DatafeedConfigProvider datafeedConfigProvider;
40+
3241
@Inject
3342
public TransportGetDatafeedsAction(Settings settings, TransportService transportService,
3443
ClusterService clusterService, ThreadPool threadPool,
3544
ActionFilters actionFilters,
36-
IndexNameExpressionResolver indexNameExpressionResolver) {
45+
IndexNameExpressionResolver indexNameExpressionResolver,
46+
Client client, NamedXContentRegistry xContentRegistry) {
3747
super(settings, GetDatafeedsAction.NAME, transportService, clusterService, threadPool, actionFilters,
3848
indexNameExpressionResolver, GetDatafeedsAction.Request::new);
49+
50+
datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry);
3951
}
4052

4153
@Override
@@ -50,18 +62,51 @@ protected GetDatafeedsAction.Response newResponse() {
5062

5163
@Override
5264
protected void masterOperation(GetDatafeedsAction.Request request, ClusterState state,
53-
ActionListener<GetDatafeedsAction.Response> listener) throws Exception {
65+
ActionListener<GetDatafeedsAction.Response> listener) {
5466
logger.debug("Get datafeed '{}'", request.getDatafeedId());
5567

56-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
57-
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
58-
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
68+
Map<String, DatafeedConfig> clusterStateConfigs =
69+
expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state);
70+
71+
datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
72+
datafeedBuilders -> {
73+
// Check for duplicate datafeeds
74+
for (DatafeedConfig.Builder datafeed : datafeedBuilders) {
75+
if (clusterStateConfigs.containsKey(datafeed.getId())) {
76+
listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " +
77+
"exists in both clusterstate and index"));
78+
return;
79+
}
80+
}
81+
82+
// Merge cluster state and index configs
83+
List<DatafeedConfig> datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size());
84+
for (DatafeedConfig.Builder builder: datafeedBuilders) {
85+
datafeeds.add(builder.build());
86+
}
87+
88+
datafeeds.addAll(clusterStateConfigs.values());
89+
Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId));
90+
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(),
91+
DatafeedConfig.RESULTS_FIELD)));
92+
},
93+
listener::onFailure
94+
));
95+
}
96+
97+
Map<String, DatafeedConfig> expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds,
98+
ClusterState clusterState) {
99+
100+
Map<String, DatafeedConfig> configById = new HashMap<>();
101+
102+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
103+
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds);
104+
59105
for (String expandedDatafeedId : expandedDatafeedIds) {
60-
datafeedConfigs.add(mlMetadata.getDatafeed(expandedDatafeedId));
106+
configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId));
61107
}
62108

63-
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeedConfigs, datafeedConfigs.size(),
64-
DatafeedConfig.RESULTS_FIELD)));
109+
return configById;
65110
}
66111

67112
@Override

0 commit comments

Comments
 (0)