Skip to content

Commit eb6b75e

Browse files
authored
[ML] Job in index: Enable delete actions for clusterstate config (#35590)
1 parent 971681e commit eb6b75e

File tree

15 files changed

+373
-54
lines changed

15 files changed

+373
-54
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ public final class Messages {
5050
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
5151
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
5252

53-
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";
53+
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
5454
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
55+
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";
5556

5657
public static final String INCONSISTENT_ID =
5758
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1212
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1313
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.block.ClusterBlockException;
1617
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1718
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.elasticsearch.cluster.metadata.MetaData;
1820
import org.elasticsearch.cluster.service.ClusterService;
1921
import org.elasticsearch.common.inject.Inject;
2022
import org.elasticsearch.common.settings.Settings;
@@ -23,6 +25,8 @@
2325
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2426
import org.elasticsearch.threadpool.ThreadPool;
2527
import org.elasticsearch.transport.TransportService;
28+
import org.elasticsearch.xpack.core.XPackPlugin;
29+
import org.elasticsearch.xpack.core.ml.MlMetadata;
2630
import org.elasticsearch.xpack.core.ml.MlTasks;
2731
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
2832
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
@@ -67,18 +71,18 @@ protected AcknowledgedResponse newResponse() {
6771

6872
@Override
6973
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
70-
ActionListener<AcknowledgedResponse> listener) throws Exception {
74+
ActionListener<AcknowledgedResponse> listener) {
7175
if (request.isForce()) {
7276
forceDeleteDatafeed(request, state, listener);
7377
} else {
74-
deleteDatafeedConfig(request, listener);
78+
deleteDatafeedConfig(request, state, listener);
7579
}
7680
}
7781

7882
private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
7983
ActionListener<AcknowledgedResponse> listener) {
8084
ActionListener<Boolean> finalListener = ActionListener.wrap(
81-
response -> deleteDatafeedConfig(request, listener),
85+
response -> deleteDatafeedConfig(request, state, listener),
8286
listener::onFailure
8387
);
8488

@@ -117,7 +121,8 @@ public void onFailure(Exception e) {
117121
}
118122
}
119123

120-
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
124+
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterState state,
125+
ActionListener<AcknowledgedResponse> listener) {
121126
// Check datafeed is stopped
122127
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
123128
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
@@ -126,10 +131,39 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionLi
126131
return;
127132
}
128133

129-
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
130-
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
131-
listener::onFailure
132-
));
134+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
135+
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
136+
deleteDatafeedFromMetadata(request, listener);
137+
} else {
138+
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
139+
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
140+
listener::onFailure
141+
));
142+
}
143+
}
144+
145+
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
146+
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(),
147+
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
148+
149+
@Override
150+
protected AcknowledgedResponse newResponse(boolean acknowledged) {
151+
return new AcknowledgedResponse(acknowledged);
152+
}
153+
154+
@Override
155+
public ClusterState execute(ClusterState currentState) {
156+
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
157+
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
158+
PersistentTasksCustomMetaData persistentTasks =
159+
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
160+
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
161+
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
162+
return ClusterState.builder(currentState).metaData(
163+
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
164+
.build();
165+
}
166+
});
133167
}
134168

135169
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener
5656
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
5757
Auditor auditor = new Auditor(client, clusterService.getNodeName());
5858
List<MlDataRemover> dataRemovers = Arrays.asList(
59-
new ExpiredResultsRemover(client, auditor),
59+
new ExpiredResultsRemover(client, clusterService, auditor),
6060
new ExpiredForecastsRemover(client, threadPool),
61-
new ExpiredModelSnapshotsRemover(client, threadPool),
61+
new ExpiredModelSnapshotsRemover(client, clusterService, threadPool),
6262
new UnusedStateRemover(client, clusterService)
6363
);
6464
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,68 @@
1616
import org.elasticsearch.action.support.WriteRequest;
1717
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1818
import org.elasticsearch.client.Client;
19+
import org.elasticsearch.cluster.ClusterState;
1920
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
21+
import org.elasticsearch.cluster.service.ClusterService;
2022
import org.elasticsearch.common.inject.Inject;
2123
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.rest.RestStatus;
2325
import org.elasticsearch.threadpool.ThreadPool;
2426
import org.elasticsearch.transport.TransportService;
2527
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
28+
import org.elasticsearch.xpack.core.ml.MlMetadata;
2629
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
2730
import org.elasticsearch.xpack.core.ml.job.config.Detector;
2831
import org.elasticsearch.xpack.core.ml.job.config.Job;
2932
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
33+
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3034
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3135
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3236

3337
import java.util.ArrayList;
38+
import java.util.Collection;
3439
import java.util.List;
40+
import java.util.Map;
3541

3642
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
3743
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
3844

3945
public class TransportDeleteFilterAction extends HandledTransportAction<DeleteFilterAction.Request, AcknowledgedResponse> {
4046

4147
private final Client client;
48+
private final ClusterService clusterService;
4249
private final JobConfigProvider jobConfigProvider;
4350

4451
@Inject
4552
public TransportDeleteFilterAction(Settings settings, ThreadPool threadPool,
4653
TransportService transportService, ActionFilters actionFilters,
4754
IndexNameExpressionResolver indexNameExpressionResolver,
48-
Client client, JobConfigProvider jobConfigProvider) {
55+
Client client, ClusterService clusterService,
56+
JobConfigProvider jobConfigProvider) {
4957
super(settings, DeleteFilterAction.NAME, threadPool, transportService, actionFilters,
5058
indexNameExpressionResolver, DeleteFilterAction.Request::new);
5159
this.client = client;
60+
this.clusterService = clusterService;
5261
this.jobConfigProvider = jobConfigProvider;
5362
}
5463

5564
@Override
5665
protected void doExecute(DeleteFilterAction.Request request, ActionListener<AcknowledgedResponse> listener) {
5766
final String filterId = request.getFilterId();
67+
68+
List<String> clusterStateJobsUsingFilter = clusterStateJobsUsingFilter(filterId, clusterService.state());
69+
if (clusterStateJobsUsingFilter.isEmpty() == false) {
70+
listener.onFailure(ExceptionsHelper.conflictStatusException(
71+
Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, clusterStateJobsUsingFilter)));
72+
return;
73+
}
74+
5875
jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap(
5976
jobs-> {
6077
List<String> currentlyUsedBy = findJobsUsingFilter(jobs, filterId);
6178
if (!currentlyUsedBy.isEmpty()) {
6279
listener.onFailure(ExceptionsHelper.conflictStatusException(
63-
"Cannot delete filter, currently used by jobs: " + currentlyUsedBy));
80+
Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, currentlyUsedBy)));
6481
} else {
6582
deleteFilter(filterId, listener);
6683
}
@@ -70,7 +87,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener<Ackn
7087
);
7188
}
7289

73-
private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId) {
90+
private static List<String> findJobsUsingFilter(Collection<Job> jobs, String filterId) {
7491
List<String> currentlyUsedBy = new ArrayList<>();
7592
for (Job job : jobs) {
7693
List<Detector> detectors = job.getAnalysisConfig().getDetectors();
@@ -84,6 +101,11 @@ private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId)
84101
return currentlyUsedBy;
85102
}
86103

104+
private static List<String> clusterStateJobsUsingFilter(String filterId, ClusterState state) {
105+
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
106+
return findJobsUsingFilter(jobs.values(), filterId);
107+
}
108+
87109
private void deleteFilter(String filterId, ActionListener<AcknowledgedResponse> listener) {
88110
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE,
89111
MlFilter.documentId(filterId));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
6666
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
6767
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
68-
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
68+
import org.elasticsearch.xpack.ml.job.JobManager;
6969
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
7070
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
7171
import org.elasticsearch.xpack.ml.notifications.Auditor;
@@ -93,7 +93,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
9393
private final PersistentTasksService persistentTasksService;
9494
private final Auditor auditor;
9595
private final JobResultsProvider jobResultsProvider;
96-
private final JobConfigProvider jobConfigProvider;
96+
private final JobManager jobManager;
9797
private final DatafeedConfigProvider datafeedConfigProvider;
9898
private final MlMemoryTracker memoryTracker;
9999

@@ -110,15 +110,15 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
110110
ThreadPool threadPool, ActionFilters actionFilters,
111111
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
112112
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
113-
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
113+
JobManager jobManager, DatafeedConfigProvider datafeedConfigProvider,
114114
MlMemoryTracker memoryTracker) {
115115
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
116116
indexNameExpressionResolver, DeleteJobAction.Request::new);
117117
this.client = client;
118118
this.persistentTasksService = persistentTasksService;
119119
this.auditor = auditor;
120120
this.jobResultsProvider = jobResultsProvider;
121-
this.jobConfigProvider = jobConfigProvider;
121+
this.jobManager = jobManager;
122122
this.datafeedConfigProvider = datafeedConfigProvider;
123123
this.memoryTracker = memoryTracker;
124124
this.listenersByJobId = new HashMap<>();
@@ -189,7 +189,15 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
189189
finalListener.onFailure(e);
190190
});
191191

192-
markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
192+
ActionListener<Boolean> checkForDatafeedsListener = ActionListener.wrap(
193+
ok -> jobManager.markJobAsDeleting(request.getJobId(), request.isForce(), markAsDeletingListener),
194+
finalListener::onFailure
195+
);
196+
197+
// This check only applies to index configurations.
198+
// ClusterState config makes the same check against the
199+
// job being used by a datafeed in MlMetadata.markJobAsDeleting()
200+
checkJobNotUsedByDatafeed(request.getJobId(), checkForDatafeedsListener);
193201
}
194202

195203
private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
@@ -231,7 +239,7 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
231239
// Step 3. When the physical storage has been deleted, delete the job config document
232240
// -------
233241
// Don't report an error if the document has already been deleted
234-
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
242+
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobManager.deleteJob(request,
235243
ActionListener.wrap(
236244
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
237245
listener::onFailure
@@ -332,9 +340,8 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
332340
);
333341

334342
// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
335-
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
336-
builder -> {
337-
Job job = builder.build();
343+
ActionListener<Job> getJobHandler = ActionListener.wrap(
344+
job -> {
338345
indexName.set(job.getResultsIndexName());
339346
if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
340347
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
@@ -357,7 +364,7 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
357364
// Step 4. Get the job as the result index name is required
358365
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
359366
response -> {
360-
jobConfigProvider.getJob(jobId, getJobHandler);
367+
jobManager.getJob(jobId, getJobHandler);
361368
},
362369
failureHandler
363370
);
@@ -574,16 +581,15 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) {
574581
}
575582
}
576583

577-
private void markJobAsDeletingIfNotUsed(String jobId, ActionListener<Boolean> listener) {
578-
584+
private void checkJobNotUsedByDatafeed(String jobId, ActionListener<Boolean> listener) {
579585
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
580586
datafeedIds -> {
581587
if (datafeedIds.isEmpty() == false) {
582588
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
583589
+ datafeedIds.iterator().next() + "] refers to it"));
584590
return;
585591
}
586-
jobConfigProvider.markJobAsDeleting(jobId, listener);
592+
listener.onResponse(Boolean.TRUE);
587593
},
588594
listener::onFailure
589595
));

0 commit comments

Comments
 (0)