Skip to content

Commit 9a1b2eb

Browse files
authored
[Ml] Prevent config snapshot failure blocking migration (#37493)
1 parent 0721448 commit 9a1b2eb

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.xcontent.XContentBuilder;
3434
import org.elasticsearch.common.xcontent.XContentFactory;
3535
import org.elasticsearch.index.IndexSettings;
36+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3637
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3738
import org.elasticsearch.xpack.core.ml.MlMetadata;
3839
import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -369,7 +370,14 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
369370
indexResponse -> {
370371
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
371372
},
372-
listener::onFailure),
373+
e -> {
374+
if (e instanceof VersionConflictEngineException) {
375+
// the snapshot already exists
376+
listener.onResponse(Boolean.TRUE);
377+
} else {
378+
listener.onFailure(e);
379+
}
380+
}),
373381
client::index
374382
);
375383
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.DocWriteRequest;
910
import org.elasticsearch.action.get.GetResponse;
11+
import org.elasticsearch.action.index.IndexRequestBuilder;
1012
import org.elasticsearch.action.index.IndexResponse;
13+
import org.elasticsearch.action.support.WriteRequest;
1114
import org.elasticsearch.cluster.ClusterName;
1215
import org.elasticsearch.cluster.ClusterState;
1316
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -178,6 +181,59 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
178181
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
179182
}
180183

184+
public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
185+
// index a doc with the same Id as the config snapshot
186+
IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
187+
ElasticsearchMappings.DOC_TYPE, "ml-config")
188+
.setSource(Collections.singletonMap("a_field", "a_value"))
189+
.setOpType(DocWriteRequest.OpType.CREATE)
190+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
191+
192+
indexRequest.execute().actionGet();
193+
194+
// define the configs
195+
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
196+
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
197+
198+
MetaData.Builder metaData = MetaData.builder();
199+
RoutingTable.Builder routingTable = RoutingTable.builder();
200+
addMlConfigIndex(metaData, routingTable);
201+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
202+
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
203+
.routingTable(routingTable.build())
204+
.build();
205+
206+
doAnswer(invocation -> {
207+
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
208+
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
209+
return null;
210+
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());
211+
212+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
213+
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
214+
215+
// do the migration
216+
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
217+
// writing the snapshot should fail because the doc already exists
218+
// in which case the migration should continue
219+
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
220+
responseHolder, exceptionHolder);
221+
222+
assertNull(exceptionHolder.get());
223+
assertTrue(responseHolder.get());
224+
225+
// check the jobs have been migrated
226+
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
227+
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
228+
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
229+
jobsHolder, exceptionHolder);
230+
231+
assertNull(exceptionHolder.get());
232+
assertThat(jobsHolder.get(), hasSize(1));
233+
assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION));
234+
assertEquals("job-foo", jobsHolder.get().get(0).build().getId());
235+
}
236+
181237
public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
182238
int jobCount = randomIntBetween(150, 201);
183239
int datafeedCount = randomIntBetween(150, jobCount);

0 commit comments

Comments
 (0)