Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/83918.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 83918
summary: Always re-run Feature migrations which have encountered errors
area: Infra/Core
type: bug
issues:
- 83917
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.upgrades.FeatureMigrationResults;
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

Expand Down Expand Up @@ -267,6 +270,62 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

public void testMigrationWillRunAfterError() throws Exception {
createSystemIndexForDescriptor(INTERNAL_MANAGED);

TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
TestPlugin.postMigrationHook.set((state, metadata) -> {});

ensureGreen();

SetOnce<Boolean> clusterStateUpdated = new SetOnce<>();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
FeatureMigrationResults newResults = new FeatureMigrationResults(
Collections.singletonMap(
FEATURE_NAME,
SingleFeatureMigrationResult.failure(INTERNAL_MANAGED_INDEX_NAME, new RuntimeException("it failed :("))
)
);
Metadata newMetadata = Metadata.builder(currentState.metadata())
.putCustom(FeatureMigrationResults.TYPE, newResults)
.build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
clusterStateUpdated.set(true);
}

@Override
public void onFailure(Exception e) {
clusterStateUpdated.set(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be captured and cause the assertBusy to fail with an assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Good point.

}
}, ClusterStateTaskExecutor.unbatched());

assertBusy(() -> assertTrue(Optional.ofNullable(clusterStateUpdated.get()).orElse(false)));

PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
// Make sure we actually started the migration
final Set<String> migratingFeatures = migrationResponse.getFeatures()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting to a set isn't anymore efficient than just looking through the list. Is there som other reason we need an intermediate set to assert the feature was processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a copy/paste from another test, you're right there's not much point to converting to a set here. I'll simplify.

.stream()
.map(PostFeatureUpgradeResponse.Feature::getFeatureName)
.collect(Collectors.toSet());
assertThat(migratingFeatures, hasItem(FEATURE_NAME));

// Now wait for the migration to finish (otherwise the test infra explodes)
assertBusy(() -> {
GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
GetFeatureUpgradeStatusResponse statusResp = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});
}

public void assertIndexHasCorrectProperties(
Metadata metadata,
String indexName,
Expand Down Expand Up @@ -344,6 +403,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final String FEATURE_NAME = "A-test-feature"; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
static final String ORIGIN = FeatureMigrationIT.class.getSimpleName();
static final String FlAG_SETTING_KEY = IndexMetadata.INDEX_PRIORITY_SETTING.getKey();
static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old";
static final int INDEX_DOC_COUNT = 100; // arbitrarily chosen
public static final Version NEEDS_UPGRADE_VERSION = Version.V_7_0_0;

Expand All @@ -354,7 +414,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".int-man-*")
.setAliasName(".internal-managed-alias")
.setPrimaryIndex(".int-man-old")
.setPrimaryIndex(INTERNAL_MANAGED_INDEX_NAME)
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, INTERNAL_MANAGED_FLAG_VALUE))
.setMappings(createSimpleMapping(true, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;

import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.getFeatureUpgradeStatus;
Expand Down Expand Up @@ -75,11 +77,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<PostFeatureUpgradeResponse> listener
) throws Exception {
final Set<GetFeatureUpgradeStatusResponse.UpgradeStatus> upgradableStatuses = EnumSet.of(
GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED,
GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR
);
List<PostFeatureUpgradeResponse.Feature> featuresToMigrate = systemIndices.getFeatures()
.values()
.stream()
.map(feature -> getFeatureUpgradeStatus(state, feature))
.filter(status -> status.getUpgradeStatus().equals(GetFeatureUpgradeStatusResponse.UpgradeStatus.MIGRATION_NEEDED))
.filter(status -> upgradableStatuses.contains(status.getUpgradeStatus()))
.map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getFeatureName)
.map(PostFeatureUpgradeResponse.Feature::new)
.sorted(Comparator.comparing(PostFeatureUpgradeResponse.Feature::getFeatureName)) // consistent ordering to simplify testing
Expand Down