Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +26,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.FROZEN_PHASE;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
* {@link org.elasticsearch.xpack.core.DataTier}s.
Expand All @@ -33,6 +36,7 @@ public class MigrateAction implements LifecycleAction {
public static final String NAME = "migrate";
public static final ParseField ENABLED_FIELD = new ParseField("enabled");

static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-existing-routing";
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> FROZEN_TO_HOT_TIERS =
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
Expand Down Expand Up @@ -92,22 +96,33 @@ public boolean isSafeAction() {
@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
if (enabled) {
StepKey preMigrateBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_MIGRATE_STEP);
StepKey migrationKey = new StepKey(phase, NAME, NAME);
StepKey migrationRoutedKey = new StepKey(phase, NAME, DataTierMigrationRoutedStep.NAME);

Settings.Builder migrationSettings = Settings.builder();
String dataTierName = "data_" + phase;
assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName;
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(dataTierName));
String targetTier = "data_" + phase;
assert DataTier.validTierName(targetTier) : "invalid data tier name:" + targetTier;

BranchingStep conditionalSkipActionStep = new BranchingStep(preMigrateBranchingKey, migrationKey, nextStepKey,
(index, clusterState) -> skipMigrateAction(phase, clusterState.metadata().index(index)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some debug logging about how we're skipping the migration because of XYZ for this? I think it'd be helpful if someone is trying to diagnose why migrate behaves the way it does.

migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(targetTier));
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
migrationSettings.build());
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
return Arrays.asList(updateMigrationSettingStep, migrationRoutedStep);
return Arrays.asList(conditionalSkipActionStep, updateMigrationSettingStep, migrationRoutedStep);
} else {
return List.of();
}
}

static boolean skipMigrateAction(String phase, IndexMetadata indexMetadata) {
// if the index is a searchable snapshot we skip the migrate action (as mounting an index as searchable snapshot
// configures the tier allocation preference), unless we're in the frozen phase
return (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null)
&& (phase.equals(FROZEN_PHASE) == false);
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
static final List<String> ORDERED_VALID_COLD_ACTIONS;
static final List<String> ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
static final List<String> ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME,
ReadOnlyAction.NAME, SearchableSnapshotAction.NAME, AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
static final Set<String> VALID_HOT_ACTIONS;
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
Expand All @@ -67,13 +67,13 @@ public class TimeseriesLifecycleType implements LifecycleType {
if (RollupV2.isEnabled()) {
ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
ReadOnlyAction.NAME, RollupILMAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
MigrateAction.NAME, FreezeAction.NAME, RollupILMAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, SearchableSnapshotAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME, RollupILMAction.NAME);
} else {
ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
ReadOnlyAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, SearchableSnapshotAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME);
}
VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
Expand Down Expand Up @@ -133,6 +133,11 @@ static boolean shouldInjectMigrateStepForPhase(Phase phase) {
}
}

if (phase.getActions().get(SearchableSnapshotAction.NAME) != null) {
// the `searchable_snapshot` action defines migration rules itself, so no need to inject a migrate action
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we still want to inject a migrate action for the frozen phase, so that a searchable snapshot created in the cold phase transitions to frozen nodes when it reaches the frozen phase, even if there is a searchable snapshot action (because the searchable snapshot action could end up being a noop if the settings are the same)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, good shout!


MigrateAction migrateAction = (MigrateAction) phase.getActions().get(MigrateAction.NAME);
// if the user configured the {@link MigrateAction} already we won't automatically configure it
return migrateAction == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,29 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.skipMigrateAction;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.FROZEN_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -49,15 +58,18 @@ public void testToSteps() {
MigrateAction action = new MigrateAction();
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(2, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, MigrateAction.NAME, MigrateAction.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, MigrateAction.NAME, DataTierMigrationRoutedStep.NAME);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
DataTierMigrationRoutedStep secondStep = (DataTierMigrationRoutedStep) steps.get(1);
assertEquals(3, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, MigrateAction.NAME, MigrateAction.CONDITIONAL_SKIP_MIGRATE_STEP);
StepKey expectedSecondStepKey = new StepKey(phase, MigrateAction.NAME, MigrateAction.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, MigrateAction.NAME, DataTierMigrationRoutedStep.NAME);
BranchingStep firstStep = (BranchingStep) steps.get(0);
UpdateSettingsStep secondStep = (UpdateSettingsStep) steps.get(1);
DataTierMigrationRoutedStep thirdStep = (DataTierMigrationRoutedStep) steps.get(2);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, firstStep.getNextStepKey());
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(nextStepKey, secondStep.getNextStepKey());
assertEquals(expectedThirdStepKey, secondStep.getNextStepKey());
assertEquals(expectedThirdStepKey, thirdStep.getKey());
assertEquals(nextStepKey, thirdStep.getNextStepKey());
}

{
Expand All @@ -81,21 +93,61 @@ public void testMigrateActionsConfiguresTierPreference() {
MigrateAction action = new MigrateAction();
{
List<Step> steps = action.toSteps(null, HOT_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(1);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, WARM_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(1);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_WARM + "," + DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, COLD_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(1);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
}
}

public void testSkipMigrateAction() {
IndexMetadata snappedIndex = IndexMetadata.builder("snapped_index")
.settings(
Settings.builder()
.put(LifecycleSettings.SNAPSHOT_INDEX_NAME, "snapped")
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
)
.build();

IndexMetadata regularIndex = IndexMetadata.builder("regular_index")
.settings(
Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
)
.build();

{
// migrate action is not skipped if the index is not a searchable snapshot
Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE, FROZEN_PHASE)
.forEach(phase -> assertThat(skipMigrateAction(phase, regularIndex), is(false)));
}

{
// migrate action is skipped if the index is a searchable snapshot for phases hot -> cold
Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE)
.forEach(phase -> assertThat(skipMigrateAction(phase, snappedIndex), is(true)));
}

{
// migrate action is never skipped for the frozen phase
assertThat(skipMigrateAction(FROZEN_PHASE, snappedIndex), is(false));
assertThat(skipMigrateAction(FROZEN_PHASE, regularIndex), is(false));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,14 @@ public void testShouldMigrateDataToTiers() {
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false));
}

{
// test phase defines a `searchable_snapshot` action
Map<String, LifecycleAction> actions = new HashMap<>();
actions.put(TEST_SEARCHABLE_SNAPSHOT_ACTION.getWriteableName(), TEST_SEARCHABLE_SNAPSHOT_ACTION);
Phase phase = new Phase(COLD_PHASE, TimeValue.ZERO, actions);
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false));
}
}

private void assertNextActionName(String phaseName, String currentAction, String expectedNextAction, String... availableActionNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.FreezeAction;
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.MigrateAction;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
Expand All @@ -39,6 +41,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -558,4 +561,62 @@ public void testSecondSearchableSnapshotChangesRepo() throws Exception {
((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
}
}

public void testSearchableSnapshotActionOverridesMigrateAction() throws Exception {
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createPolicy(client(), policy,
new Phase("hot", TimeValue.ZERO, Map.of(RolloverAction.NAME, new RolloverAction(null, null, 1L),
SearchableSnapshotAction.NAME, new SearchableSnapshotAction(
snapshotRepo, randomBoolean(), MountSearchableSnapshotRequest.Storage.FULL_COPY))
),
new Phase("warm", TimeValue.ZERO, Map.of(MigrateAction.NAME, new MigrateAction(true))),
// this time transition condition will make sure we catch ILM in the warm phase so we can assert the warm migrate action
// didn't re-configure the tier allocation settings set by the searchable_snapshot action in the hot phase
// we'll use the origination date to kick off ILM to complete the policy
new Phase("cold", TimeValue.timeValueDays(5L), Map.of(MigrateAction.NAME, new MigrateAction(true))),
new Phase("frozen", TimeValue.ZERO, Map.of(MigrateAction.NAME, new MigrateAction(true))),
null
);

createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
new Template(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
.build(), null, null)
);

indexDocument(client(), dataStream, true);
String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStream, 1L);
Map<String, Object> indexSettings = getIndexSettingsAsMap(firstGenIndex);
assertThat(indexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER), is("data_hot"));

// rollover the data stream so searchable_snapshot can complete
rolloverMaxOneDocCondition(client(), dataStream);

final String restoredIndex = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + firstGenIndex;
assertBusy(() -> {
logger.info("--> waiting for [{}] to exist...", restoredIndex);
assertTrue(indexExists(restoredIndex));
}, 30, TimeUnit.SECONDS);
assertBusy(() -> assertThat(getStepKeyForIndex(client(), restoredIndex), is(PhaseCompleteStep.finalStep("warm").getKey())),
30, TimeUnit.SECONDS);

Map<String, Object> warmIndexSettings = getIndexSettingsAsMap(restoredIndex);
// the warm phase shouldn't have changed the data_cold -> data_hot configuration
assertThat(warmIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER),
is("data_cold,data_warm,data_hot"));

// make the index 100 days old so the cold phase transition timing passes
updateIndexSettings(restoredIndex, Settings.builder().put(LifecycleSettings.LIFECYCLE_ORIGINATION_DATE,
ZonedDateTime.now().toInstant().toEpochMilli() - TimeValue.timeValueDays(100).getMillis()));

// let's wait for ILM to finish
assertBusy(() -> assertThat(getStepKeyForIndex(client(), restoredIndex), is(PhaseCompleteStep.finalStep("frozen").getKey())));

Map<String, Object> frozenIndexSettings = getIndexSettingsAsMap(restoredIndex);
// the frozen phase should've reconfigured the allocation preference
assertThat(frozenIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER),
is("data_frozen,data_cold,data_warm,data_hot"));
}

}