Skip to content

Commit 5384162

Browse files
authored
ML: creating ML State write alias and pointing writes there (#37483)
* ML: creating ML State write alias and pointing writes there * Moving alias check to openJob method * adjusting concrete index lookup for ml-state
1 parent 8da7a27 commit 5384162

File tree

16 files changed

+265
-121
lines changed

16 files changed

+265
-121
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,24 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.job.persistence;
77

8+
import org.elasticsearch.ResourceAlreadyExistsException;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.indices.alias.Alias;
11+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
12+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
13+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
14+
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.client.Client;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19+
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
23+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
24+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
25+
826
/**
927
* Methods for handling index naming related functions
1028
*/
@@ -40,11 +58,11 @@ public static String resultsWriteAlias(String jobId) {
4058
}
4159

4260
/**
43-
* The name of the default index where a job's state is stored
44-
* @return The index name
61+
* The name of the alias pointing to the appropriate index for writing job state
62+
* @return The write alias name
4563
*/
46-
public static String jobStateIndexName() {
47-
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX;
64+
public static String jobStateIndexWriteAlias() {
65+
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write";
4866
}
4967

5068
/**
@@ -64,4 +82,65 @@ public static String configIndexName() {
6482
return AnomalyDetectorsIndexFields.CONFIG_INDEX;
6583
}
6684

85+
/**
86+
* Create the .ml-state index (if necessary)
87+
* Create the .ml-state-write alias for the .ml-state index (if necessary)
88+
*/
89+
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {
90+
91+
if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
92+
finalListener.onResponse(false);
93+
return;
94+
}
95+
96+
final ActionListener<String> createAliasListener = ActionListener.wrap(
97+
concreteIndexName -> {
98+
final IndicesAliasesRequest request = client.admin()
99+
.indices()
100+
.prepareAliases()
101+
.addAlias(concreteIndexName, jobStateIndexWriteAlias())
102+
.request();
103+
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
104+
ML_ORIGIN,
105+
request,
106+
ActionListener.<AcknowledgedResponse>wrap(
107+
resp -> finalListener.onResponse(resp.isAcknowledged()),
108+
finalListener::onFailure),
109+
client.admin().indices()::aliases);
110+
},
111+
finalListener::onFailure
112+
);
113+
114+
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
115+
String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state,
116+
IndicesOptions.lenientExpandOpen(),
117+
jobStateIndexPattern());
118+
if (stateIndices.length > 0) {
119+
Arrays.sort(stateIndices, Collections.reverseOrder());
120+
createAliasListener.onResponse(stateIndices[0]);
121+
} else {
122+
CreateIndexRequest createIndexRequest = client.admin()
123+
.indices()
124+
.prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
125+
.addAlias(new Alias(jobStateIndexWriteAlias()))
126+
.request();
127+
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
128+
ML_ORIGIN,
129+
createIndexRequest,
130+
ActionListener.<CreateIndexResponse>wrap(
131+
createIndexResponse -> finalListener.onResponse(true),
132+
createIndexFailure -> {
133+
// If it was created between our last check, and this request being handled, we should add the alias
134+
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
135+
// as well.
136+
if (createIndexFailure instanceof ResourceAlreadyExistsException) {
137+
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
138+
} else {
139+
finalListener.onFailure(createIndexFailure);
140+
}
141+
}),
142+
client.admin().indices()::create);
143+
}
144+
}
145+
67146
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@
7373
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
7474
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
7575
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
76-
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
7776
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
7877
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
7978
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
@@ -762,7 +761,7 @@ public void testMachineLearningAdminRole() {
762761

763762
assertNoAccessAllowed(role, "foo");
764763
assertOnlyReadAllowed(role, MlMetaIndex.INDEX_NAME);
765-
assertOnlyReadAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
764+
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
766765
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
767766
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
768767
}
@@ -814,7 +813,7 @@ public void testMachineLearningUserRole() {
814813

815814
assertNoAccessAllowed(role, "foo");
816815
assertNoAccessAllowed(role, MlMetaIndex.INDEX_NAME);
817-
assertNoAccessAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
816+
assertNoAccessAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
818817
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
819818
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
820819
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.test.ESTestCase;
1717
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
1818
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
19+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
1920
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
2021

2122
import java.io.IOException;
@@ -30,13 +31,13 @@ public final class XPackRestTestHelper {
3031
public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
3132
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
3233
MlMetaIndex.INDEX_NAME,
33-
AnomalyDetectorsIndex.jobStateIndexName(),
34+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
3435
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
3536

3637
public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
3738
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
3839
MlMetaIndex.INDEX_NAME,
39-
AnomalyDetectorsIndex.jobStateIndexName(),
40+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
4041
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
4142
AnomalyDetectorsIndex.configIndexName()));
4243

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void testDeleteExpiredData() throws Exception {
180180
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
181181
for (int i = 0; i < 10010; i++) {
182182
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
183-
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
183+
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
184184
indexRequest.source(Collections.emptyMap());
185185
bulkRequestBuilder.add(indexRequest);
186186
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
110110
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
111111
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
112+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
112113
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
113114
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
114115
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
@@ -701,7 +702,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
701702
}
702703

703704
try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
704-
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName())
705+
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
705706
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
706707
// TODO review these settings
707708
.settings(Settings.builder()
@@ -710,9 +711,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
710711
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(stateMapping))
711712
.version(Version.CURRENT.id)
712713
.build();
713-
templates.put(AnomalyDetectorsIndex.jobStateIndexName(), stateTemplate);
714+
templates.put(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, stateTemplate);
714715
} catch (IOException e) {
715-
logger.error("Error loading the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
716+
logger.error("Error loading the template for the " + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + " index", e);
716717
}
717718

718719
try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping()) {
@@ -742,7 +743,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
742743
public static boolean allTemplatesInstalled(ClusterState clusterState) {
743744
boolean allPresent = true;
744745
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
745-
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
746+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
746747
for (String templateName : templateNames) {
747748
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
748749
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
439439

440440
logger.debug("taking a snapshot of ml_metadata");
441441
String documentId = "ml-config";
442-
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
442+
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
443443
ElasticsearchMappings.DOC_TYPE, documentId)
444444
.setOpType(DocWriteRequest.OpType.CREATE);
445445

@@ -456,8 +456,10 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
456456
return;
457457
}
458458

459-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
460-
ActionListener.<IndexResponse>wrap(
459+
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap(
460+
r -> {
461+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
462+
ActionListener.<IndexResponse>wrap(
461463
indexResponse -> {
462464
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
463465
},
@@ -469,8 +471,11 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
469471
listener.onFailure(e);
470472
}
471473
}),
472-
client::index
473-
);
474+
client::index
475+
);
476+
},
477+
listener::onFailure
478+
));
474479
}
475480

476481
private void createConfigIndex(ActionListener<Boolean> listener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ public void onFailure(Exception e) {
529529
// Try adding state doc mapping
530530
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
531531
response -> {
532-
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
532+
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping,
533533
state, jobUpdateListener);
534534
}, listener::onFailure
535535
);
@@ -673,6 +673,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
673673
private volatile int maxConcurrentJobAllocations;
674674
private volatile int maxMachineMemoryPercent;
675675
private volatile int maxLazyMLNodes;
676+
private volatile ClusterState clusterState;
676677

677678
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
678679
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker,
@@ -689,6 +690,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
689690
clusterService.getClusterSettings()
690691
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
691692
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
693+
clusterService.addListener(event -> clusterState = event.state());
692694
}
693695

694696
@Override
@@ -748,7 +750,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
748750
}
749751

750752
String jobId = jobTask.getJobId();
751-
autodetectProcessManager.openJob(jobTask, e2 -> {
753+
autodetectProcessManager.openJob(jobTask, clusterState, e2 -> {
752754
if (e2 == null) {
753755
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
754756
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
2525
import org.elasticsearch.xpack.core.ml.job.config.JobState;
2626
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
27+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
2728
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2829
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2930
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -79,26 +80,38 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste
7980
logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
8081
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());
8182

82-
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
83-
exists -> {
84-
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
85-
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
8683

87-
if (jobState.equals(JobState.CLOSED) == false) {
88-
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
84+
// 3. Revert the state
85+
ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
86+
exists -> {
87+
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
88+
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
89+
90+
if (jobState.equals(JobState.CLOSED) == false) {
91+
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
92+
}
93+
94+
getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
95+
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
96+
if (request.getDeleteInterveningResults()) {
97+
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
98+
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
8999
}
100+
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
101+
}, listener::onFailure);
102+
},
103+
listener::onFailure
104+
);
105+
106+
107+
// 2. Verify the job exists
108+
ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
109+
r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
110+
listener::onFailure
111+
);
90112

91-
getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
92-
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
93-
if (request.getDeleteInterveningResults()) {
94-
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
95-
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
96-
}
97-
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
98-
}, listener::onFailure);
99-
},
100-
listener::onFailure
101-
));
113+
// 1. Verify/Create the state index and its alias exists
114+
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener);
102115
}
103116

104117
private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public void persistCategoryDefinition(CategoryDefinition category) {
228228
*/
229229
public void persistQuantiles(Quantiles quantiles) {
230230
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
231-
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet();
231+
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet();
232232
}
233233

234234
/**
@@ -237,7 +237,7 @@ public void persistQuantiles(Quantiles quantiles) {
237237
public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) {
238238
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
239239
persistable.setRefreshPolicy(refreshPolicy);
240-
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener);
240+
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), listener);
241241
}
242242

243243
/**

0 commit comments

Comments
 (0)