Skip to content

Commit 26d0af7

Browse files
committed
Always re-run Feature migrations which have encountered errors (elastic#83918)
This PR addressed the behavior described in elastic#83917, in which Feature migrations which have encountered errors are not re-run in some cases. As of this PR, Features which have encountered errors during migration are treated the same as Features requiring migration. This PR also adds a test which artificially replicates elastic#83917.
1 parent d5c29ac commit 26d0af7

File tree

3 files changed

+81
-2
lines changed

3 files changed

+81
-2
lines changed

docs/changelog/83918.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 83918
2+
summary: Always re-run Feature migrations which have encountered errors
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 83917

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.action.support.ActiveShardCount;
2626
import org.elasticsearch.client.Client;
2727
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
29+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2830
import org.elasticsearch.cluster.metadata.IndexMetadata;
2931
import org.elasticsearch.cluster.metadata.Metadata;
3032
import org.elasticsearch.cluster.service.ClusterService;
@@ -37,6 +39,7 @@
3739
import org.elasticsearch.reindex.ReindexPlugin;
3840
import org.elasticsearch.test.ESIntegTestCase;
3941
import org.elasticsearch.upgrades.FeatureMigrationResults;
42+
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
4043
import org.elasticsearch.xcontent.XContentBuilder;
4144
import org.elasticsearch.xcontent.json.JsonXContent;
4245

@@ -50,6 +53,8 @@
5053
import java.util.Map;
5154
import java.util.Optional;
5255
import java.util.Set;
56+
import java.util.concurrent.CountDownLatch;
57+
import java.util.concurrent.TimeUnit;
5358
import java.util.concurrent.atomic.AtomicReference;
5459
import java.util.function.BiConsumer;
5560
import java.util.function.Function;
@@ -268,6 +273,67 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
268273
});
269274
}
270275

276+
public void testMigrationWillRunAfterError() throws Exception {
277+
createSystemIndexForDescriptor(INTERNAL_MANAGED);
278+
279+
TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
280+
TestPlugin.postMigrationHook.set((state, metadata) -> {});
281+
282+
ensureGreen();
283+
284+
SetOnce<Exception> failure = new SetOnce<>();
285+
CountDownLatch clusterStateUpdated = new CountDownLatch(1);
286+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
287+
.submitStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
288+
@Override
289+
public ClusterState execute(ClusterState currentState) throws Exception {
290+
FeatureMigrationResults newResults = new FeatureMigrationResults(
291+
Collections.singletonMap(
292+
FEATURE_NAME,
293+
SingleFeatureMigrationResult.failure(INTERNAL_MANAGED_INDEX_NAME, new RuntimeException("it failed :("))
294+
)
295+
);
296+
Metadata newMetadata = Metadata.builder(currentState.metadata())
297+
.putCustom(FeatureMigrationResults.TYPE, newResults)
298+
.build();
299+
return ClusterState.builder(currentState).metadata(newMetadata).build();
300+
}
301+
302+
@Override
303+
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
304+
clusterStateUpdated.countDown();
305+
}
306+
307+
@Override
308+
public void onFailure(Exception e) {
309+
failure.set(e);
310+
clusterStateUpdated.countDown();
311+
}
312+
}, ClusterStateTaskExecutor.unbatched());
313+
314+
clusterStateUpdated.await(10, TimeUnit.SECONDS); // Should be basically instantaneous
315+
if (failure.get() != null) {
316+
logger.error("cluster state update to inject migration failure state did not succeed", failure.get());
317+
fail("cluster state update failed, see log for details");
318+
}
319+
320+
PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
321+
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
322+
// Make sure we actually started the migration
323+
assertTrue(
324+
"could not find [" + FEATURE_NAME + "] in response: " + Strings.toString(migrationResponse),
325+
migrationResponse.getFeatures().stream().anyMatch(feature -> feature.getFeatureName().equals(FEATURE_NAME))
326+
);
327+
328+
// Now wait for the migration to finish (otherwise the test infra explodes)
329+
assertBusy(() -> {
330+
GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
331+
GetFeatureUpgradeStatusResponse statusResp = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
332+
logger.info(Strings.toString(statusResp));
333+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
334+
});
335+
}
336+
271337
public void assertIndexHasCorrectProperties(
272338
Metadata metadata,
273339
String indexName,
@@ -345,6 +411,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
345411
static final String FEATURE_NAME = "A-test-feature"; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
346412
static final String ORIGIN = FeatureMigrationIT.class.getSimpleName();
347413
static final String FlAG_SETTING_KEY = IndexMetadata.INDEX_PRIORITY_SETTING.getKey();
414+
static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old";
348415
static final int INDEX_DOC_COUNT = 100; // arbitrarily chosen
349416
public static final Version NEEDS_UPGRADE_VERSION = Version.V_7_0_0;
350417

@@ -355,7 +422,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
355422
static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor.builder()
356423
.setIndexPattern(".int-man-*")
357424
.setAliasName(".internal-managed-alias")
358-
.setPrimaryIndex(".int-man-old")
425+
.setPrimaryIndex(INTERNAL_MANAGED_INDEX_NAME)
359426
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
360427
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, INTERNAL_MANAGED_FLAG_VALUE))
361428
.setMappings(createSimpleMapping(true, true))

server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;
2929

3030
import java.util.Comparator;
31+
import java.util.EnumSet;
3132
import java.util.List;
33+
import java.util.Set;
3234
import java.util.stream.Collectors;
3335

3436
import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.getFeatureUpgradeStatus;
@@ -75,11 +77,15 @@ protected void masterOperation(
7577
ClusterState state,
7678
ActionListener<PostFeatureUpgradeResponse> listener
7779
) throws Exception {
80+
final Set<GetFeatureUpgradeStatusResponse.UpgradeStatus> upgradableStatuses = EnumSet.of(
81+
GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED,
82+
GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR
83+
);
7884
List<PostFeatureUpgradeResponse.Feature> featuresToMigrate = systemIndices.getFeatures()
7985
.values()
8086
.stream()
8187
.map(feature -> getFeatureUpgradeStatus(state, feature))
82-
.filter(status -> status.getUpgradeStatus().equals(GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED))
88+
.filter(status -> upgradableStatuses.contains(status.getUpgradeStatus()))
8389
.map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getFeatureName)
8490
.map(PostFeatureUpgradeResponse.Feature::new)
8591
.sorted(Comparator.comparing(PostFeatureUpgradeResponse.Feature::getFeatureName)) // consistent ordering to simplify testing

0 commit comments

Comments
 (0)