Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugin/dlm-frozen-transition/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ apply plugin: 'elasticsearch.internal-cluster-test'
esplugin {
name = 'dlm-frozen-transition'
description = 'A plugin for the frozen tier functionality of DLM'
classname = 'org.elasticsearch.xpack.dlm.frozen.DlmFrozenTransitionPlugin'
classname = 'org.elasticsearch.xpack.dlm.frozen.DLMFrozenTransitionPlugin'
extendedPlugins = ['data-streams', 'x-pack-core']
}
base {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.license.internal.XPackLicenseStatus;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.junit.Before;

import java.nio.file.Path;
import java.time.Clock;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
Expand All @@ -37,14 +44,15 @@
import static org.hamcrest.Matchers.is;

/**
* Integration tests for {@link DataStreamLifecycleConvertToFrozen#maybeMarkIndexReadOnly()}.
* Integration tests for {@link DLMConvertToFrozen#maybeMarkIndexReadOnly()}.
* These tests run against a real internal cluster and verify that the write block is properly
* added to (or skipped for) real indices, including under node failure scenarios.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DataStreamLifecycleConvertToFrozenMarkReadOnlyIT extends ESIntegTestCase {
public class DLMConvertToFrozenMarkReadOnlyIT extends ESIntegTestCase {

private static final String INDEX_NAME = "test-convert-to-frozen-mark-readonly";
private static final String REPO_NAME = "test-repo";
private XPackLicenseState licenseState;

private static void assertIndexWriteBlock(boolean expected) {
Expand Down Expand Up @@ -86,20 +94,22 @@ public void setupLicense() {
* Tests that calling maybeMarkIndexReadOnly on an index without a write block
* successfully adds the WRITE block to the index.
*/
public void testMarkIndexReadOnlyAddsWriteBlock() {
public void testMarkIndexReadOnlyAddsWriteBlock() throws Exception {
internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();

// verify the index does not have a WRITE block before calling the method
assertIndexWriteBlock(false);

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

converter.maybeMarkIndexReadOnly();
Expand All @@ -113,22 +123,24 @@ public void testMarkIndexReadOnlyAddsWriteBlock() {
* Tests that an index with a non-WRITE block (e.g., READ) still gets a WRITE block
* added by maybeMarkIndexReadOnly, since only WRITE blocks are checked.
*/
public void testMarkIndexReadOnlyAddsWriteBlockEvenWhenReadBlockExists() {
public void testMarkIndexReadOnlyAddsWriteBlockEvenWhenReadBlockExists() throws Exception {
internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();

// Add a READ block (not WRITE)
AddIndexBlockRequest addReadBlockRequest = new AddIndexBlockRequest(READ, INDEX_NAME);
assertAcked(client().execute(TransportAddIndexBlockAction.TYPE, addReadBlockRequest).actionGet());
assertIndexWriteBlock(false);

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

// Should still add the WRITE block since the existing block is READ, not WRITE
Expand All @@ -143,7 +155,7 @@ public void testMarkIndexReadOnlyAddsWriteBlockEvenWhenReadBlockExists() {
* Tests that calling maybeMarkIndexReadOnly on an index that was created with
* multiple shards successfully adds the WRITE block.
*/
public void testMarkIndexReadOnlyOnMultiShardIndex() {
public void testMarkIndexReadOnlyOnMultiShardIndex() throws Exception {
internalCluster().startNode();
int numShards = randomIntBetween(2, 5);
createIndex(
Expand All @@ -154,14 +166,16 @@ public void testMarkIndexReadOnlyOnMultiShardIndex() {
.build()
);
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();
assertIndexWriteBlock(false);

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

converter.maybeMarkIndexReadOnly();
Expand All @@ -177,19 +191,21 @@ public void testMarkIndexReadOnlyOnMultiShardIndex() {
* making any cluster state changes (verified by checking that the index metadata
* settings version and cluster state version remain unchanged after the second call).
*/
public void testMarkIndexReadOnlyCalledTwiceSuccessfully() {
public void testMarkIndexReadOnlyCalledTwiceSuccessfully() throws Exception {
internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();

ClusterService clusterService = internalCluster().clusterService();

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
clusterService,
licenseState
licenseState,
Clock.systemUTC()
);

// First call adds the block
Expand All @@ -204,12 +220,13 @@ public void testMarkIndexReadOnlyCalledTwiceSuccessfully() {
long clusterStateVersionAfterFirstCall = stateAfterFirstCall.version();

// Second converter uses the same ClusterService which always returns the latest state
DataStreamLifecycleConvertToFrozen converter2 = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter2 = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
clusterService,
licenseState
licenseState,
Clock.systemUTC()
);

// Second call should be idempotent (skips since block is already present)
Expand Down Expand Up @@ -237,7 +254,7 @@ public void testMarkIndexReadOnlyCalledTwiceSuccessfully() {
* shard-level block verification step fails due to a transport-level failure.
* Uses MockTransportService to inject an exception at the shard verification action.
*/
public void testMarkIndexReadOnlyThrowsOnShardVerificationFailure() {
public void testMarkIndexReadOnlyThrowsOnShardVerificationFailure() throws Exception {
internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(2);
Expand All @@ -247,23 +264,23 @@ public void testMarkIndexReadOnlyThrowsOnShardVerificationFailure() {
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();

// Inject a transport-level failure on the shard verification step
MockTransportService dataNodeTransport = MockTransportService.getInstance(dataNode);
dataNodeTransport.addRequestHandlingBehavior(
TransportVerifyShardIndexBlockAction.TYPE.name(),
(handler, request, channel, task) -> {
channel.sendResponse(new ElasticsearchException("simulated shard verification failure"));
}
(handler, request, channel, task) -> channel.sendResponse(new ElasticsearchException("simulated shard verification failure"))
);

try {
DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

expectThrows(ElasticsearchException.class, converter::maybeMarkIndexReadOnly);
Expand All @@ -285,19 +302,21 @@ public void testMarkIndexReadOnlyThrowsWhenIndexDeletedBeforeCall() {
);
ensureGreen(INDEX_NAME);

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

// Delete the index after the converter is constructed but before calling maybeMarkIndexReadOnly
assertAcked(indicesAdmin().prepareDelete(INDEX_NAME));

// checkIfEligibleForConvertToFrozen detects the missing index and throws IndexNotFoundException
ElasticsearchException exception = expectThrows(ElasticsearchException.class, converter::maybeMarkIndexReadOnly);
assertThat(exception.getMessage(), containsString("DLM unable to mark index"));
assertThat(exception.getMessage(), containsString("no such index"));
}

/**
Expand All @@ -316,14 +335,16 @@ public void testWriteBlockPersistsAfterMasterFailover() throws Exception {
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen(INDEX_NAME);
setupRepoAndIndexMetadata();
assertIndexWriteBlock(false);

DataStreamLifecycleConvertToFrozen converter = new DataStreamLifecycleConvertToFrozen(
DLMConvertToFrozen converter = new DLMConvertToFrozen(
INDEX_NAME,
Metadata.DEFAULT_PROJECT_ID,
client(),
internalCluster().clusterService(),
licenseState
licenseState,
Clock.systemUTC()
);

converter.maybeMarkIndexReadOnly();
Expand All @@ -339,4 +360,34 @@ public void testWriteBlockPersistsAfterMasterFailover() throws Exception {
assertIndexWriteBlock(true);
assertIndexVerifiedReadOnly();
}

/**
* Sets up the snapshot repository and adds the required custom index metadata
* ({@code data_stream_lifecycle -> dlm_freeze_with -> REPO_NAME}) so that
* {@link DLMConvertToFrozen#checkIfEligibleForConvertToFrozen()} passes.
*/
private void setupRepoAndIndexMetadata() {
// Create a snapshot repository
Path repoPath = randomRepoPath();
assertAcked(
clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, REPO_NAME)
.setType("fs")
.setSettings(Settings.builder().put("location", repoPath))
);

// Add the required custom metadata to the index via a cluster state update published through the master service
ClusterService masterClusterService = internalCluster().clusterService(internalCluster().getMasterName());
ClusterState currentState = masterClusterService.state();
ProjectMetadata projectMetadata = currentState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID);
IndexMetadata indexMetadata = projectMetadata.index(INDEX_NAME);
IndexMetadata updatedMetadata = IndexMetadata.builder(indexMetadata)
.putCustom(
DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY,
Map.of(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, REPO_NAME)
)
.build();
ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(projectMetadata).put(updatedMetadata, true);
ClusterState newState = ClusterState.builder(currentState).putProjectMetadata(projectBuilder.build()).build();
ClusterServiceUtils.setState(masterClusterService.getMasterService(), newState);
}
}
Loading
Loading