Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -97,20 +98,22 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final JobConfigProvider jobConfigProvider;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final Client client;

@Inject
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
XPackLicenseState licenseState, ClusterService clusterService,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker, Client client) {
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,OpenJobAction.Request::new,
indexNameExpressionResolver);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.jobConfigProvider = jobConfigProvider;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.client = client;
}

/**
Expand Down Expand Up @@ -224,7 +227,7 @@ protected void masterOperation(Task task, OpenJobAction.Request request, Cluster
ActionListener<NodeAcknowledgedResponse> clearJobFinishTime = ActionListener.wrap(
response -> {
if (response.isAcknowledged()) {
clearJobFinishedTime(response, jobParams.getJobId(), listener);
clearJobFinishedTime(response, state, jobParams.getJobId(), listener);
} else {
listener.onResponse(response);
}
Expand Down Expand Up @@ -309,17 +312,33 @@ public void onTimeout(TimeValue timeout) {
});
}

private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener<NodeAcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(response),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);
// Not a critical error so continue
listener.onResponse(response);
}
));
private void clearJobFinishedTime(NodeAcknowledgedResponse response,
ClusterState clusterState,
String jobId,
ActionListener<NodeAcknowledgedResponse> listener) {
final JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
ActionListener<Job> clearedTimeListener = ActionListener.wrap(
job -> listener.onResponse(response),
e -> {
logger.error(new ParameterizedMessage("[{}] Failed to clear finished_time", jobId), e);
// Not a critical error so continue
listener.onResponse(response);
}
);
ActionListener<Boolean> mappingsUpdatedListener = ActionListener.wrap(
mappingUpdateResponse -> jobConfigProvider.updateJob(jobId, update, null, clearedTimeListener),
e -> {
logger.error(new ParameterizedMessage("[{}] Failed to update mapping; not clearing finished_time", jobId), e);
// Not a critical error so continue without attempting to clear finish time
listener.onResponse(response);
}
);
ElasticsearchMappings.addDocMappingIfMissing(
MlConfigIndex.indexName(),
MlConfigIndex::mapping,
client,
clusterState,
mappingsUpdatedListener);
}

private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
Expand Down