Skip to content

Commit 131f2e5

Browse files
authored
[ML] ensure config index is updated before clearing finished_time (#61064)
When a user upgrades between versions, they may stop their ML jobs. Then when the upgrade is complete, they will want to open the jobs again. But, when opening a job, we attempt to clear out the jobs finished_time. If the job configuration has adjusted between the versions (i.e. added a new field), it will dynamically update the .ml-config index. We should instead manually change the mapping to be the updated version.
1 parent 291f5bd commit 131f2e5

File tree

1 file changed

+32
-13
lines changed

1 file changed

+32
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
5656
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
5757
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
58+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
5859
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
5960
import org.elasticsearch.xpack.ml.MachineLearning;
6061
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -97,20 +98,22 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
9798
private final JobConfigProvider jobConfigProvider;
9899
private final MlMemoryTracker memoryTracker;
99100
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
101+
private final Client client;
100102

101103
@Inject
102104
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
103105
XPackLicenseState licenseState, ClusterService clusterService,
104106
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
105107
IndexNameExpressionResolver indexNameExpressionResolver,
106-
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
108+
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker, Client client) {
107109
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,OpenJobAction.Request::new,
108110
indexNameExpressionResolver);
109111
this.licenseState = licenseState;
110112
this.persistentTasksService = persistentTasksService;
111113
this.jobConfigProvider = jobConfigProvider;
112114
this.memoryTracker = memoryTracker;
113115
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
116+
this.client = client;
114117
}
115118

116119
/**
@@ -224,7 +227,7 @@ protected void masterOperation(Task task, OpenJobAction.Request request, Cluster
224227
ActionListener<NodeAcknowledgedResponse> clearJobFinishTime = ActionListener.wrap(
225228
response -> {
226229
if (response.isAcknowledged()) {
227-
clearJobFinishedTime(response, jobParams.getJobId(), listener);
230+
clearJobFinishedTime(response, state, jobParams.getJobId(), listener);
228231
} else {
229232
listener.onResponse(response);
230233
}
@@ -309,17 +312,33 @@ public void onTimeout(TimeValue timeout) {
309312
});
310313
}
311314

312-
private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener<NodeAcknowledgedResponse> listener) {
313-
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
314-
315-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
316-
job -> listener.onResponse(response),
317-
e -> {
318-
logger.error("[" + jobId + "] Failed to clear finished_time", e);
319-
// Not a critical error so continue
320-
listener.onResponse(response);
321-
}
322-
));
315+
private void clearJobFinishedTime(NodeAcknowledgedResponse response,
316+
ClusterState clusterState,
317+
String jobId,
318+
ActionListener<NodeAcknowledgedResponse> listener) {
319+
final JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
320+
ActionListener<Job> clearedTimeListener = ActionListener.wrap(
321+
job -> listener.onResponse(response),
322+
e -> {
323+
logger.error(new ParameterizedMessage("[{}] Failed to clear finished_time", jobId), e);
324+
// Not a critical error so continue
325+
listener.onResponse(response);
326+
}
327+
);
328+
ActionListener<Boolean> mappingsUpdatedListener = ActionListener.wrap(
329+
mappingUpdateResponse -> jobConfigProvider.updateJob(jobId, update, null, clearedTimeListener),
330+
e -> {
331+
logger.error(new ParameterizedMessage("[{}] Failed to update mapping; not clearing finished_time", jobId), e);
332+
// Not a critical error so continue without attempting to clear finish time
333+
listener.onResponse(response);
334+
}
335+
);
336+
ElasticsearchMappings.addDocMappingIfMissing(
337+
MlConfigIndex.indexName(),
338+
MlConfigIndex::mapping,
339+
client,
340+
clusterState,
341+
mappingsUpdatedListener);
323342
}
324343

325344
private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,

0 commit comments

Comments
 (0)