Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -21,15 +20,13 @@
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.FROZEN_PHASE;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
* {@link org.elasticsearch.xpack.core.DataTier}s.
Expand Down Expand Up @@ -103,40 +100,34 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey migrationKey = new StepKey(phase, NAME, NAME);
StepKey migrationRoutedKey = new StepKey(phase, NAME, DataTierMigrationRoutedStep.NAME);

Settings.Builder migrationSettings = Settings.builder();
String targetTier = "data_" + phase;
assert DataTier.validTierName(targetTier) : "invalid data tier name:" + targetTier;

BranchingStep conditionalSkipActionStep = new BranchingStep(preMigrateBranchingKey, migrationKey, nextStepKey,
(index, clusterState) -> {
if (skipMigrateAction(phase, clusterState.metadata().index(index))) {
String policyName =
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(clusterState.metadata().index(index).getSettings());
logger.debug("[{}] action is configured for index [{}] in policy [{}] which is already mounted as a searchable " +
"snapshot. skipping this action", MigrateAction.NAME, index.getName(), policyName);
Settings indexSettings = clusterState.metadata().index(index).getSettings();

// partially mounted indices will already have data_frozen, and we don't want to change that if they do
if (SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(indexSettings)) {
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
logger.debug("[{}] action in policy [{}] is configured for index [{}] which is a partially mounted index. " +
"skipping this action", MigrateAction.NAME, policyName, index.getName());
return true;
}

// don't skip the migrate action as the index is not mounted as searchable snapshot or we're in the frozen phase
return false;
});
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(targetTier));
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
migrationSettings.build());
Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(targetTier))
.build());
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
return Arrays.asList(conditionalSkipActionStep, updateMigrationSettingStep, migrationRoutedStep);
return List.of(conditionalSkipActionStep, updateMigrationSettingStep, migrationRoutedStep);
} else {
return List.of();
}
}

static boolean skipMigrateAction(String phase, IndexMetadata indexMetadata) {
// if the index is a searchable snapshot we skip the migrate action (as mounting an index as searchable snapshot
// configures the tier allocation preference), unless we're in the frozen phase
return (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null)
&& (phase.equals(FROZEN_PHASE) == false);
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

Expand Down Expand Up @@ -101,10 +103,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
indexName = snapshotIndexName;
}

Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString());
// if we are mounting a searchable snapshot in the hot phase, then the index should be pinned to the hot nodes
if (TimeseriesLifecycleType.HOT_PHASE.equals(this.getKey().getPhase())) {
settingsBuilder.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, DataTier.DATA_HOT);
}
final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(mountedIndexName,
snapshotRepository, snapshotName, indexName, Settings.builder()
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString())
.build(),
snapshotRepository, snapshotName, indexName, settingsBuilder.build(),
// we captured the index metadata when we took the snapshot. the index likely had the ILM execution state in the metadata.
// if we were to restore the lifecycle.name setting, the restored index would be captured by the ILM runner and,
// depending on what ILM execution state was captured at snapshot time, make it's way forward from _that_ step forward in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,20 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.skipMigrateAction;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.FROZEN_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -115,44 +106,4 @@ public void testMigrateActionsConfiguresTierPreference() {
is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
}
}

public void testSkipMigrateAction() {
IndexMetadata snappedIndex = IndexMetadata.builder("snapped_index")
.settings(
Settings.builder()
.put(LifecycleSettings.SNAPSHOT_INDEX_NAME, "snapped")
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
)
.build();

IndexMetadata regularIndex = IndexMetadata.builder("regular_index")
.settings(
Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
)
.build();

{
// migrate action is not skipped if the index is not a searchable snapshot
Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE, FROZEN_PHASE)
.forEach(phase -> assertThat(skipMigrateAction(phase, regularIndex), is(false)));
}

{
// migrate action is skipped if the index is a searchable snapshot for phases hot -> cold
Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE)
.forEach(phase -> assertThat(skipMigrateAction(phase, snappedIndex), is(true)));
}

{
// migrate action is never skipped for the frozen phase
assertThat(skipMigrateAction(FROZEN_PHASE, snappedIndex), is(false));
assertThat(skipMigrateAction(FROZEN_PHASE, regularIndex), is(false));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exceptio
TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
public void testDeleteActionDeletesSearchableSnapshot() throws Exception {
@SuppressWarnings("unchecked")
public void testDeleteActionDeletesSearchableSnapshot() throws Exception {
createSnapshotRepo(client(), snapshotRepo, randomBoolean());

// create policy with cold and delete phases
Expand Down Expand Up @@ -198,21 +198,21 @@ public void testDeleteActionDeletesSearchableSnapshot() throws Exception {
assertBusy(() -> assertFalse(indexExists(restoredIndexName)), 60, TimeUnit.SECONDS);

assertTrue("the snapshot we generate in the cold phase should be deleted by the delete phase", waitUntil(() -> {
try {
Request getSnapshotsRequest = new Request("GET", "_snapshot/" + snapshotRepo + "/_all");
Response getSnapshotsResponse = client().performRequest(getSnapshotsRequest);

Map<String, Object> responseMap;
try (InputStream is = getSnapshotsResponse.getEntity().getContent()) {
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
List<Object> responses = (List<Object>) responseMap.get("responses");
Object snapshots = ((Map<String, Object>) responses.get(0)).get("snapshots");
return ((List<Map<String, Object>>) snapshots).size() == 0;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
try {
Request getSnapshotsRequest = new Request("GET", "_snapshot/" + snapshotRepo + "/_all");
Response getSnapshotsResponse = client().performRequest(getSnapshotsRequest);

Map<String, Object> responseMap;
try (InputStream is = getSnapshotsResponse.getEntity().getContent()) {
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
List<Object> responses = (List<Object>) responseMap.get("responses");
Object snapshots = ((Map<String, Object>) responses.get(0)).get("snapshots");
return ((List<Map<String, Object>>) snapshots).size() == 0;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
}, 30, TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -478,6 +478,45 @@ public void testSecondSearchableSnapshotUsingDifferentRepoThrows() throws Except
containsString("policy specifies [searchable_snapshot] action multiple times with differing repositories"));
}

public void testSearchableSnapshotsInHotPhasePinnedToHotNodes() throws Exception {
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createPolicy(client(), policy,
new Phase("hot", TimeValue.ZERO, Map.of(RolloverAction.NAME, new RolloverAction(null, null, null, 1L),
SearchableSnapshotAction.NAME, new SearchableSnapshotAction(
snapshotRepo, randomBoolean()))
),
null, null, null, null
);

createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
new Template(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
.build(), null, null)
);

indexDocument(client(), dataStream, true);
String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStream, 1L);
Map<String, Object> indexSettings = getIndexSettingsAsMap(firstGenIndex);
assertThat(indexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER), is("data_hot"));

// rollover the data stream so searchable_snapshot can complete
rolloverMaxOneDocCondition(client(), dataStream);

final String restoredIndex = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + firstGenIndex;
assertBusy(() -> {
logger.info("--> waiting for [{}] to exist...", restoredIndex);
assertTrue(indexExists(restoredIndex));
}, 30, TimeUnit.SECONDS);
assertBusy(() -> assertThat(getStepKeyForIndex(client(), restoredIndex), is(PhaseCompleteStep.finalStep("hot").getKey())),
30, TimeUnit.SECONDS);

Map<String, Object> hotIndexSettings = getIndexSettingsAsMap(restoredIndex);
// searchable snapshots mounted in the hot phase should be pinned to hot nodes
assertThat(hotIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER),
is("data_hot"));
}

public void testSearchableSnapshotActionOverridesMigrateAction() throws Exception {
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
createPolicy(client(), policy,
Expand All @@ -486,11 +525,7 @@ SearchableSnapshotAction.NAME, new SearchableSnapshotAction(
snapshotRepo, randomBoolean()))
),
new Phase("warm", TimeValue.ZERO, Map.of(MigrateAction.NAME, new MigrateAction(true))),
// this time transition condition will make sure we catch ILM in the warm phase so we can assert the warm migrate action
// didn't re-configure the tier allocation settings set by the searchable_snapshot action in the hot phase
// we'll use the origination date to kick off ILM to complete the policy
new Phase("cold", TimeValue.timeValueDays(5L), Map.of(MigrateAction.NAME, new MigrateAction(true))),
null, null
null, null, null
);

createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
Expand All @@ -503,6 +538,7 @@ snapshotRepo, randomBoolean()))
indexDocument(client(), dataStream, true);
String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStream, 1L);
Map<String, Object> indexSettings = getIndexSettingsAsMap(firstGenIndex);
// the searchable snapshot is allocated to the hot tier
assertThat(indexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER), is("data_hot"));

// rollover the data stream so searchable_snapshot can complete
Expand All @@ -517,21 +553,8 @@ snapshotRepo, randomBoolean()))
30, TimeUnit.SECONDS);

Map<String, Object> warmIndexSettings = getIndexSettingsAsMap(restoredIndex);
// the warm phase shouldn't have changed the data_cold -> data_hot configuration
assertThat(warmIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER),
is("data_cold,data_warm,data_hot"));

// make the index 100 days old so the cold phase transition timing passes
updateIndexSettings(restoredIndex, Settings.builder().put(LifecycleSettings.LIFECYCLE_ORIGINATION_DATE,
ZonedDateTime.now().toInstant().toEpochMilli() - TimeValue.timeValueDays(100).getMillis()));

// let's wait for ILM to finish
assertBusy(() -> assertThat(getStepKeyForIndex(client(), restoredIndex), is(PhaseCompleteStep.finalStep("cold").getKey())));

Map<String, Object> coldIndexSettings = getIndexSettingsAsMap(restoredIndex);
// the frozen phase should've reconfigured the allocation preference
assertThat(coldIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER),
is("data_cold,data_warm,data_hot"));
// the searchable snapshot continues on the to warm tier and onward
assertThat(warmIndexSettings.get(DataTierAllocationDecider.INDEX_ROUTING_PREFER), is("data_warm,data_hot"));
}

}