Skip to content

Commit a497263

Browse files
authored
[ML] ensure config index is updated before clearing finished_time (#61064) (#61085)
When a user upgrades between versions, they may stop their ML jobs. Then when the upgrade is complete, they will want to open the jobs again. But, when opening a job, we attempt to clear out the jobs finished_time. If the job configuration has adjusted between the versions (i.e. added a new field), it will dynamically update the .ml-config index. We should instead manually change the mapping to be the updated version.
1 parent 0549c40 commit a497263

File tree

1 file changed

+32
-13
lines changed

1 file changed

+32
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
5555
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
5656
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
57+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
5758
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
5859
import org.elasticsearch.xpack.ml.MachineLearning;
5960
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -96,20 +97,22 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
9697
private final JobConfigProvider jobConfigProvider;
9798
private final MlMemoryTracker memoryTracker;
9899
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
100+
private final Client client;
99101

100102
@Inject
101103
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
102104
XPackLicenseState licenseState, ClusterService clusterService,
103105
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
104106
IndexNameExpressionResolver indexNameExpressionResolver,
105-
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
107+
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker, Client client) {
106108
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,OpenJobAction.Request::new,
107109
indexNameExpressionResolver);
108110
this.licenseState = licenseState;
109111
this.persistentTasksService = persistentTasksService;
110112
this.jobConfigProvider = jobConfigProvider;
111113
this.memoryTracker = memoryTracker;
112114
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
115+
this.client = client;
113116
}
114117

115118
/**
@@ -222,7 +225,7 @@ protected void masterOperation(OpenJobAction.Request request, ClusterState state
222225
ActionListener<NodeAcknowledgedResponse> clearJobFinishTime = ActionListener.wrap(
223226
response -> {
224227
if (response.isAcknowledged()) {
225-
clearJobFinishedTime(response, jobParams.getJobId(), listener);
228+
clearJobFinishedTime(response, state, jobParams.getJobId(), listener);
226229
} else {
227230
listener.onResponse(response);
228231
}
@@ -307,17 +310,33 @@ public void onTimeout(TimeValue timeout) {
307310
});
308311
}
309312

310-
private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener<NodeAcknowledgedResponse> listener) {
311-
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
312-
313-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
314-
job -> listener.onResponse(response),
315-
e -> {
316-
logger.error("[" + jobId + "] Failed to clear finished_time", e);
317-
// Not a critical error so continue
318-
listener.onResponse(response);
319-
}
320-
));
313+
private void clearJobFinishedTime(NodeAcknowledgedResponse response,
314+
ClusterState clusterState,
315+
String jobId,
316+
ActionListener<NodeAcknowledgedResponse> listener) {
317+
final JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
318+
ActionListener<Job> clearedTimeListener = ActionListener.wrap(
319+
job -> listener.onResponse(response),
320+
e -> {
321+
logger.error(new ParameterizedMessage("[{}] Failed to clear finished_time", jobId), e);
322+
// Not a critical error so continue
323+
listener.onResponse(response);
324+
}
325+
);
326+
ActionListener<Boolean> mappingsUpdatedListener = ActionListener.wrap(
327+
mappingUpdateResponse -> jobConfigProvider.updateJob(jobId, update, null, clearedTimeListener),
328+
e -> {
329+
logger.error(new ParameterizedMessage("[{}] Failed to update mapping; not clearing finished_time", jobId), e);
330+
// Not a critical error so continue without attempting to clear finish time
331+
listener.onResponse(response);
332+
}
333+
);
334+
ElasticsearchMappings.addDocMappingIfMissing(
335+
MlConfigIndex.indexName(),
336+
MlConfigIndex::mapping,
337+
client,
338+
clusterState,
339+
mappingsUpdatedListener);
321340
}
322341

323342
private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,

0 commit comments

Comments
 (0)