diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 154acb8c06d83..6ab68c7b75f83 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -12,7 +12,12 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -35,6 +40,7 @@ import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; @@ -64,6 +70,7 @@ import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.health.node.FetchHealthInfoCacheAction; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; @@ -71,6 +78,7 @@ import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; @@ -86,9 +94,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; @@ -117,6 +128,7 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(DataStreamLifecycleServiceIT.class); + private static final String DEFAULT_REPO = "my-repo"; @Override protected Collection> nodePlugins() { @@ -895,7 +907,7 @@ public void testReenableDataStreamLifecycle() throws Exception { public void testLifecycleAppliedToFailureStore() throws Exception { DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.failuresLifecycleBuilder() - .dataRetention(TimeValue.timeValueSeconds(20)) + .dataRetention(TimeValue.timeValueMinutes(20)) .buildTemplate(); putComposableIndexTemplate("id1", """ @@ -937,17 +949,27 @@ public void testLifecycleAppliedToFailureStore() throws Exception { ByteSizeValue targetFloor = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(clusterSettings); assertBusy(() -> { - GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(firstGenerationIndex) - .includeDefaults(true); - GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet(); - assertThat( - getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()), - is(targetFactor.toString()) - ); - assertThat( - getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()), - is(targetFloor.getStringRep()) - ); + try { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(firstGenerationIndex) + .includeDefaults(true); + GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet(); + assertThat( + getSettingsResponse.getSetting( + firstGenerationIndex, + MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey() + ), + is(targetFactor.toString()) + ); + assertThat( + getSettingsResponse.getSetting( + firstGenerationIndex, + MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey() + ), + is(targetFloor.getStringRep()) + ); + } catch (IndexNotFoundException e) { + fail("expected index " + firstGenerationIndex + " to exist but it did not."); + } }); updateFailureStoreConfiguration(dataStreamName, true, TimeValue.timeValueSeconds(1)); @@ -967,7 +989,84 @@ public void testLifecycleAppliedToFailureStore() throws Exception { List retrievedFailureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices(); assertThat(retrievedFailureIndices.size(), equalTo(1)); assertThat(retrievedFailureIndices.get(0).getName(), equalTo(secondGenerationIndex)); - }); + }, 30, TimeUnit.SECONDS); + } + + public void testCollectAndMarkIndicesForFrozen() throws Exception { + assumeTrue("requires feature flag to be enabled", DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()); + + client().execute( + TransportPutRepositoryAction.TYPE, + new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, DEFAULT_REPO).name(DEFAULT_REPO) + .type("fs") + .settings(Settings.builder().put("location", DEFAULT_REPO)) + ).get(); + updateClusterSettings(Settings.builder().put(RepositoriesService.DEFAULT_REPOSITORY_SETTING.getKey(), DEFAULT_REPO)); + + DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() + .frozenAfter(TimeValue.timeValueDays(1)) + .buildTemplate(); + + Iterable dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); + Clock clock = Clock.systemUTC(); + AtomicLong now = new AtomicLong(clock.millis()); + dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(now::get)); + + putComposableIndexTemplate( + "mytemplate", + null, + List.of("foo*"), + Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1").build(), + null, + lifecycle, + null, + false + ); + + String dataStream = "foo-ds"; + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStream + ); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + indexDocs(dataStream, randomIntBetween(10, 50)); + + // Let's verify the rollover + List backingIndices = waitForDataStreamIndices(dataStream, 2, false); + String candidateIndex = backingIndices.get(0); + + AtomicLong twoDaysLater = new AtomicLong(clock.millis() + TimeValue.timeValueDays(2).millis()); + dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(twoDaysLater::get)); + + assertBusy(() -> { + logger.info("--> checking to see if index has been marked for frozen"); + ClusterStateResponse resp = client().execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT)).get(); + ClusterState state = resp.getState(); + String setRepo = Optional.ofNullable(state.metadata().getProject(Metadata.DEFAULT_PROJECT_ID)) + .map(pm -> pm.index(candidateIndex)) + .map(peek(im -> logger.info("--> found index {}", candidateIndex))) + .map(im -> im.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) + .map(peek(custom -> logger.info("--> index {} has custom metadata: {}", candidateIndex, custom))) + .map(meta -> meta.get(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY)) + .map(peek(repo -> logger.info("--> index {} has repo {} configured", candidateIndex, repo))) + .orElse("_unset_"); + logger.info("--> repository set to: {}", setRepo); + assertThat(setRepo, equalTo(DEFAULT_REPO)); + }, 30, TimeUnit.SECONDS); + + dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis)); + } + + /** + * Helper for peeking Optionals + */ + UnaryOperator peek(Consumer c) { + return x -> { + c.accept(x); + return x; + }; } static void indexDocs(String dataStream, int numDocs) { diff --git a/modules/data-streams/src/main/java/module-info.java b/modules/data-streams/src/main/java/module-info.java index 0aaf79db05c41..35280353b0826 100644 --- a/modules/data-streams/src/main/java/module-info.java +++ b/modules/data-streams/src/main/java/module-info.java @@ -13,6 +13,7 @@ requires org.elasticsearch.xcontent; requires org.apache.logging.log4j; requires org.apache.lucene.core; + requires org.elasticsearch.logging; exports org.elasticsearch.datastreams.action to org.elasticsearch.server; exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index eaf5e91c45a6a..6874544df176f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -58,6 +58,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.scheduler.SchedulerEngine; import org.elasticsearch.common.scheduler.TimeValueSchedule; @@ -65,7 +66,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleIndexExecutor; @@ -82,6 +82,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; @@ -169,6 +170,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab * This is the key for data stream lifecycle related custom index metadata. */ public static final String FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY = "force_merge_completed_timestamp"; + public static final String FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY = "dlm_freeze_with"; private final Settings settings; private final Client client; private final ClusterService clusterService; @@ -190,12 +192,14 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private final MasterServiceTaskQueue forceMergeClusterStateUpdateTaskQueue; private final MasterServiceTaskQueue swapSourceWithDownsampleIndexQueue; private final MasterServiceTaskQueue markIndexForDlmForceMergeQueue; + private final MasterServiceTaskQueue markIndicesForFrozenQueue; private volatile ByteSizeValue targetMergePolicyFloorSegment; private volatile int targetMergePolicyFactor; /** * The number of retries for a particular index and error after which DSL will emmit a signal (e.g. log statement) */ private volatile int signallingErrorRetryInterval; + private volatile String defaultRepository; /** * The following stats are tracking how the data stream lifecycle runs are performing time wise @@ -248,6 +252,7 @@ public DataStreamLifecycleService( this.signallingErrorRetryInterval = DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.get(settings); this.rolloverConfiguration = clusterService.getClusterSettings() .get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING); + this.defaultRepository = RepositoriesService.DEFAULT_REPOSITORY_SETTING.get(settings); this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue( "data-stream-lifecycle-forcemerge-state-update", Priority.LOW, @@ -263,6 +268,11 @@ public DataStreamLifecycleService( Priority.LOW, new MarkIndexForDLMForceMergeExecutor() ); + this.markIndicesForFrozenQueue = clusterService.createTaskQueue( + "dlm-mark-index-for-frozen", + Priority.LOW, + new MarkIndicesForFrozenExecutor() + ); this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher; this.actions = actions; } @@ -282,6 +292,8 @@ public void init() { .addSettingsUpdateConsumer(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING, this::updateMergePolicyFloorSegment); clusterService.getClusterSettings() .addSettingsUpdateConsumer(DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING, this::updateSignallingRetryThreshold); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(RepositoriesService.DEFAULT_REPOSITORY_SETTING, this::updateDefaultRepository); } @Override @@ -387,6 +399,7 @@ private void run(ProjectState projectState) { final var project = projectState.metadata(); int affectedIndices = 0; int affectedDataStreams = 0; + final Set indicesForFrozenConversion = new HashSet<>(); for (DataStream dataStream : project.dataStreams().values()) { clearErrorStoreForUnmanagedIndices(project, dataStream); var dataLifecycleEnabled = dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled(); @@ -473,6 +486,32 @@ private void run(ProjectState projectState) { ); } + try { + if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { + // Collect all candidates for conversion to a frozen index. + // These will be processed at the end of the loop where we mark all the indices at once. + Set candidatesForFrozen = candidatesForFrozen( + project, + dataStream, + nowSupplier, + getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false) + ); + // Exclude these candidates from the rest of the run + indicesToExcludeForRemainingRun.addAll(candidatesForFrozen); + // Add them to the list to be marked for conversion + indicesForFrozenConversion.addAll(candidatesForFrozen); + } + } catch (Exception e) { + logger.warn( + () -> String.format( + Locale.ROOT, + "Data stream lifecycle failed to collect candidates for converting to frozen index for data stream [%s]", + dataStream.getName() + ), + e + ); + } + try { indicesToExcludeForRemainingRun.addAll(maybeProcessDlmActions(projectState, dataStream, indicesToExcludeForRemainingRun)); } catch (Exception e) { @@ -489,6 +528,25 @@ private void run(ProjectState projectState) { affectedIndices += indicesToExcludeForRemainingRun.size(); affectedDataStreams++; } + + try { + if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { + // Only identify and mark indices if the default repository setting is set, + // if it's entirely unset, no work could proceed, so we should just skip + // the frozen step entirely. + if (Strings.hasText(defaultRepository)) { + maybeMarkIndicesForFrozen(projectState, indicesForFrozenConversion); + } else if (indicesForFrozenConversion.isEmpty() == false) { + logger.debug( + "DLM identified {} indices as candidates to convert to frozen, but no default repository is configured", + indicesForFrozenConversion.size() + ); + } + } + } catch (Exception e) { + logger.warn("Data stream lifecycle failed to mark candidates for converting to frozen index for data stream", e); + } + lastRunDuration = nowSupplier.getAsLong() - lastRunStartedAt; logger.trace( "Data stream lifecycle service ran for {} and performed operations on [{}] indices, part of [{}] data streams, in project [{}]", @@ -506,6 +564,82 @@ static String formatExecutionTime(long executionTimeMillis) { return executionTimeMillis + "ms/" + TimeValue.timeValueMillis(executionTimeMillis).toString(); } + /** + * Returns true if the index has been marked with custom metadata indicating it should be converted to a frozen index. + */ + public static boolean indexMarkedForFrozen(IndexMetadata indexMetadata) { + if (indexMetadata == null) { + return false; + } + return Optional.ofNullable(indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) + .filter(m -> m.get(FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY) != null) + .isPresent(); + } + + /** + * Return a set of indices that are past the `frozen_after` date and are also candidates in the supplied list of available indices. + */ + static Set candidatesForFrozen( + ProjectMetadata projectMetadata, + DataStream dataStream, + LongSupplier nowSupplier, + List availableIndices + ) { + if (dataStream.getDataLifecycle() == null || dataStream.getDataLifecycle().frozenAfter() == null) { + return Set.of(); + } + + TimeValue frozenAfterTime = dataStream.getDataLifecycle().frozenAfter(); + Set candidates = new HashSet<>(); + + for (Index index : dataStream.getIndicesOlderThan(projectMetadata::index, nowSupplier, frozenAfterTime, BACKING_INDICES)) { + if (availableIndices.contains(index) == false) { + // If it's not in the available candidates (where no other DLM action is working on it), then skip it + continue; + } + Optional.ofNullable(projectMetadata.index(index)) + .filter(indexMeta -> indexMarkedForFrozen(indexMeta) == false) + .ifPresent(metadata -> candidates.add(metadata.getIndex())); + } + return candidates; + } + + /** + * Mark the given indices as ready to be converted into frozen indices. If the list is empty, nothing is done. + */ + public void maybeMarkIndicesForFrozen(ProjectState projectState, Set indicesForFrozenConversion) { + if (indicesForFrozenConversion.isEmpty()) { + return; + } + logger.trace( + "DLM submitting request to mark {} indices to be converted to frozen {}", + indicesForFrozenConversion.size(), + indicesForFrozenConversion.stream().map(Index::getName).toList() + ); + markIndicesForFrozenQueue.submitTask( + "dlm-mark-[" + indicesForFrozenConversion.size() + "]-indices-for-frozen", + new MarkIndicesForFrozenTask( + projectState.projectId(), + indicesForFrozenConversion, + ActionListener.wrap( + ackedResponse -> logger.info( + "DLM successfully marked {} indices as ready to be frozen: {}", + indicesForFrozenConversion.size(), + indicesForFrozenConversion.stream().map(Index::getName).toList() + ), + exception -> logger.warn( + Strings.format( + "DLM was unable to mark %s indices as ready to be frozen, it will be retried", + indicesForFrozenConversion.size() + ), + exception + ) + ) + ), + null + ); + } + /** * Processes Data Lifecycle Management (DLM) actions for the given data stream. *

@@ -1726,6 +1860,10 @@ public void updateSignallingRetryThreshold(int retryThreshold) { this.signallingErrorRetryInterval = retryThreshold; } + public void updateDefaultRepository(String defaultRepository) { + this.defaultRepository = defaultRepository; + } + private void cancelJob() { if (scheduler.get() != null) { scheduler.get().remove(LIFECYCLE_JOB_NAME); @@ -1834,6 +1972,26 @@ public ClusterState execute(BatchExecutionContext } } + /** + * Executor for marking indices for conversion to frozen + */ + public static class MarkIndicesForFrozenExecutor implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(BatchExecutionContext batchExecutionContext) { + var state = batchExecutionContext.initialState(); + for (final var taskContext : batchExecutionContext.taskContexts()) { + try { + final MarkIndicesForFrozenTask task = taskContext.getTask(); + state = task.execute(state); + taskContext.success(task); + } catch (Exception e) { + taskContext.onFailure(e); + } + } + return state; + } + } + /** * Marks the given index to be force merged for DLM by updating the cluster state with the name of the index to be force merged in the * custom metadata of the source index. This method returns immediately, but the update to the cluster state happens asynchronously and diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTask.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTask.java new file mode 100644 index 0000000000000..50ad1ae3c8946 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTask.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.Index; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.repositories.RepositoriesService; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This task takes a set of indices, and adds custom metadata to the {@link DataStreamsPlugin#LIFECYCLE_CUSTOM_INDEX_METADATA_KEY} map + * that indicates the index is ready to be converted into a frozen index. The metadata is a key-value pair of + * {@link DataStreamLifecycleService#FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY} and a String which is the currently configured + * "repositories.default_repository" setting, defined in {@link RepositoriesService#DEFAULT_REPOSITORY_SETTING}. + * + * If the default repository is not configured, the cluster state update is a no-op. + */ +public class MarkIndicesForFrozenTask extends AckedBatchedClusterStateUpdateTask { + private static final Logger logger = LogManager.getLogger(MarkIndicesForFrozenTask.class); + private final Set indicesToMarkForFrozen; + private final ProjectId projectId; + + public MarkIndicesForFrozenTask(ProjectId projectId, Set indicesToMarkForFrozen, ActionListener listener) { + super(TimeValue.THIRTY_SECONDS, listener); + this.indicesToMarkForFrozen = indicesToMarkForFrozen; + this.projectId = projectId; + } + + ClusterState execute(ClusterState currentState) { + final ProjectMetadata projectMetadata = currentState.metadata().getProject(this.projectId); + if (projectMetadata == null) { + return currentState; + } + final String defaultRepository = RepositoriesService.DEFAULT_REPOSITORY_SETTING.get(currentState.getMetadata().settings()); + if (Strings.hasText(defaultRepository) == false) { + logger.debug("DLM skipping marking indices as ready for frozen conversion, because no default repository has been configured"); + return currentState; + } + + final ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(projectMetadata); + boolean changed = false; + for (Index index : indicesToMarkForFrozen) { + final IndexMetadata indexMetadata = projectMetadata.index(index); + if (indexMetadata == null) { + logger.trace( + "DLM tried to mark [{}] as ready to be converted to a frozen index, but it does not exist, ignoring", + index.getName() + ); + continue; + } + + Map existingMetadata = indexMetadata.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); + Map newMetadata = new HashMap<>(); + if (existingMetadata != null) { + if (defaultRepository.equals(existingMetadata.get(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY))) { + // This index has already been marked as ready for frozen conversion, and with the same repository, so skip it + continue; + } + // Make sure we don't lose the existing metadata + newMetadata.putAll(existingMetadata); + } + + // Update the custom metadata with the key (dlm_freeze_with) and value of the currently configured default repository + newMetadata.put(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, defaultRepository); + final IndexMetadata updatedMetadata = IndexMetadata.builder(indexMetadata) + .putCustom(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, newMetadata) + .build(); + + projectBuilder.put(updatedMetadata, true); + changed = true; + } + + if (changed) { + return ClusterState.builder(currentState).putProjectMetadata(projectBuilder.build()).build(); + } else { + logger.debug("DLM marking {} indices as ready for frozen ended in a no-op", indicesToMarkForFrozen.size()); + return currentState; + } + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 377160f22cd2d..4e6200bce1b82 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -106,6 +106,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -2557,4 +2558,109 @@ public void testFormatExecutionTimeMinutes() { public void testFormatExecutionTimeHours() { assertThat(DataStreamLifecycleService.formatExecutionTime(5400000), equalTo("5400000ms/1.5h")); } + + public void testIndexMarkedForFrozen() { + IndexMetadata plainIndex = IndexMetadata.builder("foo") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + IndexMetadata indexWithOtherCustom = IndexMetadata.builder("foo") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, Map.of("foo", "bar")) + .build(); + IndexMetadata indexWithFrozenCustom = IndexMetadata.builder("foo") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom( + LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, + Map.of(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, "my-repo") + ) + .build(); + + assertFalse(DataStreamLifecycleService.indexMarkedForFrozen(plainIndex)); + assertFalse(DataStreamLifecycleService.indexMarkedForFrozen(indexWithOtherCustom)); + assertTrue(DataStreamLifecycleService.indexMarkedForFrozen(indexWithFrozenCustom)); + } + + public void testGatheringCandidatesForFrozen() { + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); + int backingIndices = randomIntBetween(3, 10); + DataStream dataStreamWithNoFrozen = createDataStream( + builder, + "my-datastream", + backingIndices, + settings(IndexVersion.current()), + DataStreamLifecycle.DEFAULT_DATA_LIFECYCLE, + now + ); + DataStream dataStreamWithFrozen = createDataStream( + builder, + "my-datastream-with-frozen", + backingIndices, + settings(IndexVersion.current()), + DataStreamLifecycle.dataLifecycleBuilder().enabled(true).frozenAfter(TimeValue.timeValueMinutes(1)).build(), + now + ); + builder.put(dataStreamWithNoFrozen); + builder.put(dataStreamWithFrozen); + ProjectMetadata projectMetadata = builder.build(); + + assertThat( + DataStreamLifecycleService.candidatesForFrozen(projectMetadata, dataStreamWithNoFrozen, () -> now, List.of()), + equalTo(Set.of()) + ); + assertThat( + DataStreamLifecycleService.candidatesForFrozen( + projectMetadata, + dataStreamWithNoFrozen, + () -> now, + dataStreamWithNoFrozen.getIndices() + ), + equalTo(Set.of()) + ); + assertThat( + DataStreamLifecycleService.candidatesForFrozen(projectMetadata, dataStreamWithFrozen, () -> now, List.of()), + equalTo(Set.of()) + ); + assertThat( + DataStreamLifecycleService.candidatesForFrozen( + projectMetadata, + dataStreamWithFrozen, + () -> now, + dataStreamWithFrozen.getIndices() + ), + equalTo(Set.of()) + ); + assertThat( + DataStreamLifecycleService.candidatesForFrozen( + projectMetadata, + dataStreamWithFrozen, + () -> now + TimeValue.timeValueDays(2).millis(), + List.of() + ), + equalTo(Set.of()) + ); + Set candidates = DataStreamLifecycleService.candidatesForFrozen( + projectMetadata, + dataStreamWithFrozen, + () -> now + TimeValue.timeValueMinutes(2).millis(), + dataStreamWithFrozen.getIndices() + ); + assertThat( + new TreeSet<>(candidates.stream().map(Index::getName).toList()), + equalTo( + new TreeSet<>( + dataStreamWithFrozen.getIndices() + .subList(0, (int) dataStreamWithFrozen.getGeneration() - 1) + .stream() + .map(Index::getName) + .toList() + ) + ) + ); + } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTaskTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTaskTests.java new file mode 100644 index 0000000000000..a5aa5aea8584e --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTaskTests.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class MarkIndicesForFrozenTaskTests extends ESTestCase { + final ProjectId PROJECT_ID = randomProjectIdOrDefault(); + final String REPO = randomAlphaOfLength(10); + + public void testMarkIndicesForFrozen() throws Exception { + IndexMetadata vanilla = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("vanilla", 1)) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(randomNonNegativeLong()) + .build(); + IndexMetadata withOtherCustom = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("withCustom", 1)) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(randomNonNegativeLong()) + .putCustom(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, Map.of("other", "thing")) + .build(); + IndexMetadata alreadyMarkedDifferentRepo = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("diffRepo", 1)) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(randomNonNegativeLong()) + .putCustom( + DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, + Map.of(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, "other-repo") + ) + .build(); + IndexMetadata alreadyMarkedSameRepo = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("alreadyMarked", 1)) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(randomNonNegativeLong()) + .putCustom( + DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, + Map.of(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, REPO) + ) + .build(); + Set allIndices = Set.of( + vanilla.getIndex(), + withOtherCustom.getIndex(), + alreadyMarkedDifferentRepo.getIndex(), + alreadyMarkedSameRepo.getIndex() + ); + + ProjectMetadata projectMetadata = ProjectMetadata.builder(PROJECT_ID) + .put(vanilla, true) + .put(withOtherCustom, true) + .put(alreadyMarkedDifferentRepo, true) + .put(alreadyMarkedSameRepo, true) + .build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadata).build(); + MarkIndicesForFrozenTask task = new MarkIndicesForFrozenTask(PROJECT_ID, allIndices, ActionListener.noop()); + + ClusterState newState = task.execute(state); + // No change, there is no default repo + assertThat(newState, sameInstance(state)); + + // Add the repo to the settings + state = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .persistentSettings(Settings.builder().put(RepositoriesService.DEFAULT_REPOSITORY_SETTING.getKey(), REPO).build()) + .build() + ) + .putProjectMetadata(projectMetadata) + .build(); + + newState = task.execute(state); + assertIndexMarked(newState, vanilla.getIndex().getName()); + assertIndexMarked(newState, withOtherCustom.getIndex().getName()); + assertIndexMarked(newState, alreadyMarkedDifferentRepo.getIndex().getName()); + assertIndexMarked(newState, alreadyMarkedSameRepo.getIndex().getName()); + + assertThat( + getCustomData(newState, withOtherCustom.getIndex().getName()), + equalTo(Map.of("other", "thing", DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, REPO)) + ); + } + + private void assertIndexMarked(ClusterState state, String indexName) { + assertTrue( + "expected " + indexName + " to be marked as ready for frozen, but its custom data was: " + getCustomData(state, indexName), + Optional.ofNullable(state.metadata().getProject(PROJECT_ID).index(indexName)) + .map(DataStreamLifecycleService::indexMarkedForFrozen) + .orElse(false) + ); + assertThat( + "expected " + indexName + " to have " + REPO + " as its repository, but it has: " + getCustomData(state, indexName), + Optional.ofNullable(state.metadata().getProject(PROJECT_ID).index(indexName)) + .map(im -> im.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) + .map(custom -> custom.get(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY)) + .orElse("_unset_"), + equalTo(REPO) + ); + } + + private Map getCustomData(ClusterState state, String indexName) { + return Optional.ofNullable(state.metadata().getProject(PROJECT_ID).index(indexName)) + .map(im -> im.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) + .orElse(Map.of()); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index c12670f024d72..cdfdba82b0531 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -921,7 +921,7 @@ public static List waitForDataStreamIndices(String dataStreamName, int e } return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize; }); - safeAwait(listener); + safeAwait(listener, TimeValue.timeValueSeconds(30)); final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore); assertEquals( Strings.format(