diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 56fa0245223a4..41992f73378c7 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -60,9 +60,6 @@ import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.steps.ForceMergeStep; import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; import org.elasticsearch.datastreams.lifecycle.transitions.steps.TransportMarkIndexForDLMForceMergeAction; import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction; @@ -186,9 +183,6 @@ public List> getSettings() { pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING); - if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { - pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING); - } return pluginSettings; } @@ -217,11 +211,6 @@ public Collection createComponents(PluginServices services) { ) ); - // Register DLM actions here. Order matters - they will be executed in the order they are listed for a given index. - List dlmActions = List.of(); - - verifyActions(dlmActions); - dataLifecycleInitialisationService.set( new DataStreamLifecycleService( settings, @@ -233,8 +222,7 @@ public Collection createComponents(PluginServices services) { errorStoreInitialisationService.get(), services.allocationService(), dataStreamLifecycleErrorsPublisher.get(), - services.dataStreamGlobalRetentionSettings(), - dlmActions + services.dataStreamGlobalRetentionSettings() ) ); dataLifecycleInitialisationService.get().init(); @@ -246,26 +234,6 @@ public Collection createComponents(PluginServices services) { return components; } - // visible for testing - static void verifyActions(List dlmActions) { - for (DlmAction action : dlmActions) { - if (action.steps().isEmpty()) { - throw new IllegalStateException("DLM action [" + action.name() + "] must have at least one step"); - } - for (DlmStep step : action.steps()) { - if (step.possibleOutputIndexNamePatterns("dummy-index").isEmpty()) { - throw new IllegalStateException( - "DLM step [" - + step.stepName() - + "] in action [" - + action.name() - + "] must have at least one possible output index name pattern" - ); - } - } - } - } - @Override public List getActions() { List actions = new ArrayList<>(); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 6874544df176f..f121d858360e2 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -71,10 +71,6 @@ import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleIndexExecutor; import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleToDS; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; @@ -105,7 +101,6 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.ALL; import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.BACKING_INDICES; import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.FAILURE_INDICES; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; @@ -179,7 +174,6 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab final ResultDeduplicator, Void> clusterStateChangesDeduplicator; private final DataStreamLifecycleHealthInfoPublisher dslHealthInfoPublisher; private final DataStreamGlobalRetentionSettings globalRetentionSettings; - private final List actions; private LongSupplier nowSupplier; private final Clock clock; private final DataStreamLifecycleErrorStore errorStore; @@ -232,8 +226,7 @@ public DataStreamLifecycleService( DataStreamLifecycleErrorStore errorStore, AllocationService allocationService, DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher, - DataStreamGlobalRetentionSettings globalRetentionSettings, - List actions + DataStreamGlobalRetentionSettings globalRetentionSettings ) { this.settings = settings; this.client = client; @@ -274,7 +267,6 @@ public DataStreamLifecycleService( new MarkIndicesForFrozenExecutor() ); this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher; - this.actions = actions; } /** @@ -512,19 +504,6 @@ private void run(ProjectState projectState) { ); } - try { - indicesToExcludeForRemainingRun.addAll(maybeProcessDlmActions(projectState, dataStream, indicesToExcludeForRemainingRun)); - } catch (Exception e) { - logger.warn( - () -> String.format( - Locale.ROOT, - "Data stream lifecycle failed to execute actions for data stream [%s]", - dataStream.getName() - ), - e - ); - } - affectedIndices += indicesToExcludeForRemainingRun.size(); affectedDataStreams++; } @@ -640,232 +619,6 @@ public void maybeMarkIndicesForFrozen(ProjectState projectState, Set indi ); } - /** - * Processes Data Lifecycle Management (DLM) actions for the given data stream. - *

- * For each configured {@link DlmAction}, this method: - * * Determines if the action is scheduled for the data stream. - * * Finds indices eligible for the action, excluding those in {@code indicesToExclude}. - * * For each eligible index, iterates through the action's steps in reverse order until finding a step that is complete or - * reaching the start of the list - * * Iterate one step forward through the list to find the first incomplete step. - * * Execute the step handling and logging any exceptions. - * * Adds the index to {@code indicesToExclude} after a step is executed to avoid reprocessing in this run. - *

- * Any errors encountered during step completion checks or execution are logged, but do not prevent processing of - * other actions or indices. - * - * @param projectState the current project state - * @param dataStream the data stream to process - * @return The set of indices processed that should be ignored by later actions / included in stats - */ - // Visible for testing - Set maybeProcessDlmActions(ProjectState projectState, DataStream dataStream, Set indicesToExclude) { - HashSet indicesProcessed = new HashSet<>(); - DlmActionContext actionContext = new DlmActionContext( - projectState, - transportActionsDeduplicator, - errorStore, - signallingErrorRetryInterval, - client, - Clock.systemUTC() - ); - for (DlmAction action : actions) { - - if (action.canRunOnProject(actionContext) == false) { - logger.trace( - "Skipping action [{}] for project [{}] as prerequisites are not met", - action.name(), - projectState != null ? projectState.projectId() : "unknown" - ); - continue; - } - - TimeValue actionSchedule = action.applyAfterTime().apply(dataStream.getDataLifecycle()); - - if (actionSchedule == null) { - logger.trace( - "Data stream lifecycle action [{}] is not scheduled for data stream [{}]", - action.name(), - dataStream.getName() - ); - continue; - } - - long actionStartTime = nowSupplier.getAsLong(); - - List indicesEligibleForAction; - if (action.appliesToFailureStore()) { - indicesEligibleForAction = dataStream.getIndicesOlderThan( - indexName -> projectState.metadata().index(indexName), - nowSupplier, - actionSchedule, - ALL - ); - } else { - indicesEligibleForAction = dataStream.getIndicesOlderThan( - indexName -> projectState.metadata().index(indexName), - nowSupplier, - actionSchedule, - BACKING_INDICES - ); - } - - indicesEligibleForAction.removeAll(indicesToExclude); - indicesEligibleForAction.removeAll(indicesProcessed); - - logger.trace( - "Data stream lifecycle action [{}] found [{}] eligible indices for data stream [{}]", - action.name(), - indicesEligibleForAction.size(), - dataStream.getName() - ); - - for (Index index : indicesEligibleForAction) { - long findStepStartTime = nowSupplier.getAsLong(); - int stepToExecuteIndex = findFirstIncompleteStepIndex(projectState, dataStream, action, index); - if (logger.isTraceEnabled()) { - long findStepDuration = nowSupplier.getAsLong() - findStepStartTime; - logger.trace( - "Finding first incomplete step for action [{}] on datastream [{}] index [{}] took [{}]", - action.name(), - dataStream.getName(), - index.getName(), - formatExecutionTime(findStepDuration) - ); - } - - if (stepToExecuteIndex >= 0) { - DlmStep stepToExecute = action.steps().get(stepToExecuteIndex); - try { - logger.trace( - "Executing step [{}] for action [{}] on datastream [{}] index [{}]", - stepToExecute.stepName(), - action.name(), - dataStream.getName(), - index.getName() - ); - long stepStartTime = nowSupplier.getAsLong(); - Index indexForExecution = resolveIndexOutputFromPreviousStep(stepToExecuteIndex, index, action, projectState); - DlmStepContext dlmStepContext = actionContext.stepContextFor(indexForExecution); - stepToExecute.execute(dlmStepContext); - if (logger.isTraceEnabled()) { - long stepDuration = nowSupplier.getAsLong() - stepStartTime; - logger.trace( - "Executed step [{}] for action [{}] on datastream [{}] index [{}] in [{}]", - stepToExecute.stepName(), - action.name(), - dataStream.getName(), - index.getName(), - formatExecutionTime(stepDuration) - ); - } - } catch (Exception ex) { - logger.warn( - logger.getMessageFactory() - .newMessage( - "Unable to execute step [{}] for action [{}] on datastream [{}] index [{}]", - stepToExecute.stepName(), - action.name(), - dataStream.getName(), - index.getName() - ), - ex - ); - continue; - } - indicesProcessed.add(index); - } - } - if (logger.isTraceEnabled()) { - long actionDuration = nowSupplier.getAsLong() - actionStartTime; - logger.trace( - "Data stream lifecycle action [{}] for data stream [{}] completed in [{}]", - action.name(), - dataStream.getName(), - formatExecutionTime(actionDuration) - ); - } - } - return indicesProcessed; - } - - private int findFirstIncompleteStepIndex(ProjectState projectState, DataStream dataStream, DlmAction action, Index index) { - assert action.steps().size() >= 1 : "an action must have at least one step"; - int stepToExecute = -1; - for (int i = action.steps().size() - 1; i >= 0; i--) { - DlmStep step = action.steps().get(i); - try { - long checkStartTime = nowSupplier.getAsLong(); - Index indexInUse = resolveIndexOutputFromPreviousStep(i, index, action, projectState); - if (step.stepCompleted(indexInUse, projectState) == false) { - stepToExecute = i; - if (logger.isTraceEnabled()) { - logger.trace( - "Step [{}] for action [{}] on datastream [{}] index [{}] is not complete, checked in [{}]", - step.stepName(), - action.name(), - dataStream.getName(), - indexInUse.getName(), - formatExecutionTime(nowSupplier.getAsLong() - checkStartTime) - ); - } - } else { - if (logger.isTraceEnabled()) { - logger.trace( - "Step [{}] for action [{}] on datastream [{}] index [{}] is already complete, checked in [{}]", - step.stepName(), - action.name(), - dataStream.getName(), - indexInUse.getName(), - formatExecutionTime(nowSupplier.getAsLong() - checkStartTime) - ); - } - break; - } - } catch (Exception ex) { - logger.warn( - logger.getMessageFactory() - .newMessage( - "Unable to execute check for step complete [{}] for action [{}] on datastream [{}] index [{}]", - step.stepName(), - action.name(), - dataStream.getName(), - index.getName() - ), - ex - ); - } - } - return stepToExecute; - } - - // visible for testing - Index resolveIndexOutputFromPreviousStep(int stepToExecuteIndex, Index index, DlmAction action, ProjectState projectState) { - if (stepToExecuteIndex < 1) { - return index; - } - - DlmStep previousStep = action.steps().get(stepToExecuteIndex - 1); - List possibleIndexNames = previousStep.possibleOutputIndexNamePatterns(index.getName()); - - return possibleIndexNames.stream() - .filter(projectState.metadata()::hasIndex) - .findFirst() - .map(possibleName -> projectState.metadata().index(possibleName).getIndex()) - .orElseGet(() -> { - assert false - : "Unable to resolve index name for executing step [" - + action.steps().get(stepToExecuteIndex).stepName() - + "] for action [" - + action.name() - + "] with index [" - + index.getName() - + "]"; - return index; - }); - } - // visible for testing static Set timeSeriesIndicesStillWithinTimeBounds(ProjectMetadata project, List targetIndices, LongSupplier nowSupplier) { Set tsIndicesWithinBounds = new HashSet<>(); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmAction.java deleted file mode 100644 index 4f6202f059743..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmAction.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions; - -import org.elasticsearch.cluster.metadata.DataStreamLifecycle; -import org.elasticsearch.core.TimeValue; - -import java.util.List; -import java.util.function.Function; - -/** - * An action within Data Lifecycle Management that consists of multiple steps to be executed in sequence. - * It represents a transition from one lifecycle phase to another, such as moving from hot to frozen storage. - */ -public interface DlmAction { - - /** - * A human-readable name for the action. - * - * @return The action name. - */ - String name(); - - /** - * A function that extracts the scheduling property ({@link TimeValue}) from the datastream's {@link DataStreamLifecycle} - * configuration. - * This ({@link TimeValue}) determines how old an index should be for this action to be triggered and the index transitioned - * - * @return A function that takes a DataStreamLifecycle and returns the scheduling TimeValue. - */ - Function applyAfterTime(); - - /** - * The ordered list of steps that make up this action that must be executed sequentially to complete the action. - * - * @return A list of DlmStep instances representing the steps of the action. - */ - List steps(); - - /** - * Indicates whether this action applies to the failure store. - * By default, actions do not apply to the failure store. - * - * @return true if the action applies to the failure store, false otherwise. - */ - default boolean appliesToFailureStore() { - return false; - } - - /** - * Determines whether this action can run on the given project. This allows actions to be skipped entirely - * if the cluster is not in a compatible state, for example if a required default snapshot repository is not - * configured or available. - * - * @param dlmActionContext The context providing access to the project state and resources. - * @return true if the action can proceed, false if it should be skipped for this project. - */ - boolean canRunOnProject(DlmActionContext dlmActionContext); -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmActionContext.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmActionContext.java deleted file mode 100644 index 02f1e9e7c555c..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmActionContext.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions; - -import org.elasticsearch.action.ResultDeduplicator; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; -import org.elasticsearch.index.Index; -import org.elasticsearch.transport.TransportRequest; - -import java.time.Clock; - -/** - * Context and resources required for executing a DLM action. - */ -public record DlmActionContext( - ProjectState projectState, - ResultDeduplicator, Void> transportActionsDeduplicator, - DataStreamLifecycleErrorStore errorStore, - int signallingErrorRetryThreshold, - Client client, - Clock clock -) { - /** - * @return The project ID associated with this context. - */ - public ProjectId projectId() { - return projectState.projectId(); - } - - /** - * Creates a {@link DlmStepContext} for the given index, using the resources from this action context. - * - * @param index The index to create a step context for. - * @return A new {@link DlmStepContext} for the given index. - */ - public DlmStepContext stepContextFor(Index index) { - return new DlmStepContext(index, this); - } - -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStep.java deleted file mode 100644 index 3d41e74b565ae..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStep.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions; - -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.index.Index; - -import java.util.List; - -/** - * A step within a Data Lifecycle Management action. Each step is responsible for determining if it has been completed for a given index - * and executing the necessary operations to complete the step. - */ -public interface DlmStep { - - /** - * Determines if the step has been completed for the given index and project state. - * - * @param index The index to check. - * @param projectState The current project state. - * @return - */ - boolean stepCompleted(Index index, ProjectState projectState); - - /** - * This method determines how to execute the step and performs the necessary operations to update the index - * so that {@link #stepCompleted(Index, ProjectState)} will return true after successful execution. - * - * @param dlmStepContext The context and resources for executing the step. - */ - void execute(DlmStepContext dlmStepContext); - - /** - * A human-readable name for the step. - * - * @return The step name. - */ - String stepName(); - - /** - * Returns a list of possible index name patterns that this step may end up creating. This is then used by later steps to - * determine the name of the index they should work on after this step is run. The order is important as the first pattern that - * matches an existing index will be used by the later step. - *
- * The default implementation returns in the steps input index name as the only possible output index name pattern, - * which is sufficient for steps that do not change the index name. - * @param indexName Index name this step ran on - * @return List of possible index name patterns that this step may end up creating - */ - default List possibleOutputIndexNamePatterns(String indexName) { - return List.of(indexName); - } -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStepContext.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStepContext.java deleted file mode 100644 index 177202b32ebdb..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/DlmStepContext.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ResultDeduplicator; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; -import org.elasticsearch.datastreams.lifecycle.ErrorRecordingActionListener; -import org.elasticsearch.index.Index; -import org.elasticsearch.transport.TransportRequest; - -import java.time.Clock; -import java.util.function.BiConsumer; - -/** - * Context and resources required for executing a DLM step. - */ -public record DlmStepContext( - Index index, - ProjectState projectState, - ResultDeduplicator, Void> transportActionsDeduplicator, - DataStreamLifecycleErrorStore errorStore, - int signallingErrorRetryThreshold, - Client client, - Clock clock -) { - - /** - * Creates a step context from a {@link DlmActionContext} and an index. - * - * @param index The index this step context is for. - * @param actionContext The action context to derive common resources from. - */ - public DlmStepContext(Index index, DlmActionContext actionContext) { - this( - index, - actionContext.projectState(), - actionContext.transportActionsDeduplicator(), - actionContext.errorStore(), - actionContext.signallingErrorRetryThreshold(), - actionContext.client(), - actionContext.clock() - ); - } - - /** - * @return The name of the index associated with this context. - */ - public String indexName() { - return index.getName(); - } - - /** - * @return The project ID associated with this context. - */ - public ProjectId projectId() { - return projectState.projectId(); - } - - public void executeDeduplicatedRequest( - String actionName, - TransportRequest request, - String failureMessage, - BiConsumer, ActionListener> callback - ) { - transportActionsDeduplicator.executeOnce( - Tuple.tuple(projectId(), request), - new ErrorRecordingActionListener( - actionName, - projectId(), - indexName(), - errorStore, - failureMessage, - signallingErrorRetryThreshold - ), - callback - ); - } - - /* - * @return true if the request is in-progress (deduplicator is currently - * tracking the provided projectId, request tuple), - * false otherwise. - */ - public boolean isRequestInProgress(TransportRequest request) { - return transportActionsDeduplicator.hasRequest(Tuple.tuple(projectId(), request)); - } -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java deleted file mode 100644 index ed2e2af56dd22..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; -import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; -import org.elasticsearch.action.admin.indices.shrink.ResizeType; -import org.elasticsearch.action.admin.indices.shrink.TransportResizeAction; -import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; - -import java.util.List; -import java.util.Optional; - -import static org.apache.logging.log4j.LogManager.getLogger; -import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; - -/** - * This step clones the index into a new index with 0 replicas. - * The clone index is then marked in the custom metadata of the index metadata - * of the original index as the index to be force merged by DLM in the next step. - * If the original index already has 0 replicas, it will be marked directly for force - * merge without cloning. - * The step is completed when the clone index (or original index if it had 0 replicas) - * has all primary shards active, which means it's ready to be force merged in the next step. - */ -public class CloneStep implements DlmStep { - - private static final TimeValue CLONE_TIMEOUT = TimeValue.timeValueHours(12); - private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); - private static final Logger logger = getLogger(CloneStep.class); - public static final String CLONE_INDEX_PREFIX = "dlm-clone-"; - - @Override - public boolean stepCompleted(Index index, ProjectState projectState) { - ProjectMetadata projectMetadata = projectState.metadata(); - // the index can either be "cloned" or the original index if it had 0 replicas - return Optional.ofNullable(getIndexToBeForceMerged(index.getName(), projectState)) - .map(idx -> projectMetadata.indices().containsKey(idx) ? idx : null) - .map(idx -> projectState.routingTable().index(idx)) - .map(IndexRoutingTable::allPrimaryShardsActive) - .orElse(false); - } - - @Override - public void execute(DlmStepContext stepContext) { - Index index = stepContext.index(); - String indexName = index.getName(); - ProjectState projectState = stepContext.projectState(); - ProjectMetadata projectMetadata = projectState.metadata(); - IndexMetadata indexMetadata = projectMetadata.index(index); - - if (indexMetadata == null) { - logger.warn("Index [{}] not found in project metadata, skipping clone step", indexName); - return; - } - - if (indexMetadata.getNumberOfReplicas() == 0) { - logger.info( - "Skipping clone step for index [{}] as it already has 0 replicas and can be used for force merge directly", - indexName - ); - // mark the index to be force merged directly - maybeMarkIndexToBeForceMerged(indexName, indexName, stepContext, ActionListener.wrap(resp -> { - logger.info("DLM successfully marked index [{}] to be force merged", indexName); - }, err -> { - logger.error(() -> Strings.format("DLM failed to mark index [%s] to be force merged", indexName), err); - stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); - })); - return; - } - - String cloneIndex = getDLMCloneIndexName(indexName); - if (projectMetadata.indices().containsKey(cloneIndex)) { - // Clone index exists but step not completed - check if it's been stuck for too long and clean up if so - maybeCleanUpStuckCloneTask(cloneIndex, stepContext); - return; - } - - maybeCloneIndex( - indexName, - cloneIndex, - ActionListener.wrap( - resp -> logger.info( - "DLM successfully completed clone and force merge marking of index [{}] to index [{}]", - indexName, - cloneIndex - ), - err -> { - logger.error(() -> Strings.format("DLM failed to clone index [%s] to index [%s]", indexName, cloneIndex), err); - stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); - } - ), - stepContext - ); - } - - @Override - public String stepName() { - return "Clone Index"; - } - - @Override - public List possibleOutputIndexNamePatterns(String indexName) { - // The clone index name pattern should be checked before the original index name pattern - return List.of(getDLMCloneIndexName(indexName), indexName); - } - - /** - * Checks if the clone index has been stuck for too long and if so, deletes it to allow a new clone attempt. - */ - private static void maybeCleanUpStuckCloneTask(String cloneIndex, DlmStepContext stepContext) { - String indexName = stepContext.indexName(); - ProjectMetadata projectMetadata = stepContext.projectState().metadata(); - IndexMetadata cloneIndexMetadata = projectMetadata.index(cloneIndex); - if (cloneIndexMetadata == null) { - logger.debug( - "Clone index [{}] for index [{}] not found in project metadata during stuck clone check, it may have been deleted", - cloneIndex, - indexName - ); - return; - } - - long cloneCreationTime = getCloneIndexCreationTime(cloneIndex, cloneIndexMetadata, projectMetadata); - - long currentTime = stepContext.clock().millis(); - long timeSinceCreation = currentTime - cloneCreationTime; - TimeValue timeSinceCreationValue = TimeValue.timeValueMillis(timeSinceCreation); - if (isCloneIndexStuck(cloneIndexMetadata, timeSinceCreation, stepContext)) { - // Clone has been stuck for > 12 hours, clean it up so a new clone can be attempted - logger.info( - "DLM cleaning up clone index [{}] for index [{}] as it has been in progress for [{}] (raw: [{}ms]), " - + "exceeding timeout of [{}] (raw: [{}ms])", - cloneIndex, - indexName, - timeSinceCreationValue.toHumanReadableString(2), - timeSinceCreation, - CLONE_TIMEOUT.toHumanReadableString(2), - CLONE_TIMEOUT.millis() - ); - maybeDeleteCloneIndex( - stepContext, - ActionListener.wrap( - resp -> logger.info("DLM successfully cleaned up clone index [{}] for index [{}]", cloneIndex, indexName), - err -> { - logger.error( - () -> Strings.format("DLM failed to clean up clone index [%s] for index [%s]", cloneIndex, indexName), - err - ); - stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); - } - ) - ); - } else { - // Clone is still fresh, wait for it to complete - logger.debug( - "DLM clone index [{}] for index [{}] exists and has been in progress for [{}] (raw: [{}ms]), " - + "waiting for completion or timeout of [{}] (raw: [{}ms])", - cloneIndex, - indexName, - timeSinceCreationValue.toHumanReadableString(2), - timeSinceCreation, - CLONE_TIMEOUT.toHumanReadableString(2), - CLONE_TIMEOUT.millis() - ); - } - } - - /** - * Determines if a clone index creation is stuck based on whether the clone request is still - * in progress or the clone shards are not active, and if the defined timeout has been breached. - */ - private static boolean isCloneIndexStuck(IndexMetadata cloneIndexMetadata, long timeSinceCreation, DlmStepContext stepContext) { - ResizeRequest cloneRequest = formCloneRequest(stepContext.indexName(), cloneIndexMetadata.getIndex().getName()); - boolean cloneShardsAreActive = Optional.ofNullable(cloneIndexMetadata.getIndex()) - .map(Index::getName) - .map(stepContext.projectState().routingTable()::index) - .map(IndexRoutingTable::allPrimaryShardsActive) - .orElse(false); - return (stepContext.isRequestInProgress(cloneRequest) || cloneShardsAreActive == false) - && timeSinceCreation > CLONE_TIMEOUT.millis(); - } - - /** - * Listener for the clone index action that will mark the cloned index to be force merged on success within the index metadata of the - * original index, or clean up the clone index on failure. - */ - private static class CloneIndexResizeActionListener implements ActionListener { - private final String originalIndex; - private final String cloneIndex; - private final ActionListener listener; - private final DlmStepContext stepContext; - - private CloneIndexResizeActionListener( - String originalIndex, - String cloneIndex, - ActionListener listener, - DlmStepContext stepContext - ) { - this.originalIndex = originalIndex; - this.cloneIndex = cloneIndex; - this.listener = listener; - this.stepContext = stepContext; - } - - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { - if (createIndexResponse.isAcknowledged() == false) { - onFailure( - new ElasticsearchException( - Strings.format("DLM failed to acknowledge clone of index [%s] to index [%s]", originalIndex, cloneIndex) - ) - ); - return; - } - logger.info("DLM successfully cloned index [{}] to index [{}]", originalIndex, cloneIndex); - // on success, write the cloned index name to the custom metadata of the index metadata of original index - maybeMarkIndexToBeForceMerged(originalIndex, cloneIndex, stepContext, listener.delegateFailure((l, v) -> { - logger.info("DLM successfully marked index [{}] to be force merged for source index [{}]", cloneIndex, originalIndex); - l.onResponse(null); - })); - } - - @Override - public void onFailure(Exception e) { - logger.error(() -> Strings.format("DLM failed to clone index [%s] to index [%s]", originalIndex, cloneIndex), e); - stepContext.errorStore().recordError(stepContext.projectId(), originalIndex, e); - maybeDeleteCloneIndex(stepContext, listener.delegateFailure((l, v) -> { - logger.info( - "DLM successfully deleted clone index [{}] after failed attempt to clone index [{}]", - cloneIndex, - originalIndex - ); - l.onFailure(e); - })); - } - } - - private void maybeCloneIndex(String originalIndex, String cloneIndex, ActionListener listener, DlmStepContext stepContext) { - ResizeRequest cloneIndexRequest = formCloneRequest(originalIndex, cloneIndex); - stepContext.executeDeduplicatedRequest( - TransportResizeAction.TYPE.name(), - cloneIndexRequest, - Strings.format("DLM service encountered an error when trying to clone index [%s]", originalIndex), - (req, unused) -> cloneIndex(stepContext.projectId(), cloneIndexRequest, listener, stepContext) - ); - } - - private void cloneIndex(ProjectId projectId, ResizeRequest cloneRequest, ActionListener listener, DlmStepContext stepContext) { - assert cloneRequest.indices() != null && cloneRequest.indices().length == 1 : "DLM should clone one index at a time"; - String originalIndex = cloneRequest.getSourceIndex(); - String cloneIndex = cloneRequest.getTargetIndexRequest().index(); - logger.trace("DLM issuing request to clone index [{}] to index [{}]", originalIndex, cloneIndex); - CloneIndexResizeActionListener responseListener = new CloneIndexResizeActionListener( - originalIndex, - cloneIndex, - listener, - stepContext - ); - stepContext.client().projectClient(projectId).execute(TransportResizeAction.TYPE, cloneRequest, responseListener); - } - - /** - * Updates the custom metadata of the index metadata of the source index to mark the target index as that to be force merged by DLM. - * This method performs the update asynchronously using a transport action. - */ - private static void maybeMarkIndexToBeForceMerged( - String originalIndex, - String indexToBeForceMerged, - DlmStepContext stepContext, - ActionListener listener - ) { - MarkIndexForDLMForceMergeAction.Request markIndexForForceMergeRequest = new MarkIndexForDLMForceMergeAction.Request( - originalIndex, - indexToBeForceMerged - ); - stepContext.executeDeduplicatedRequest( - MarkIndexForDLMForceMergeAction.TYPE.name(), - markIndexForForceMergeRequest, - Strings.format( - "DLM service encountered an error when trying to mark index [%s] to be force merged for source index [%s]", - indexToBeForceMerged, - originalIndex - ), - (req, unused) -> markIndexToBeForceMerged(markIndexForForceMergeRequest, stepContext, listener) - ); - } - - private static void markIndexToBeForceMerged( - MarkIndexForDLMForceMergeAction.Request request, - DlmStepContext stepContext, - ActionListener listener - ) { - logger.debug( - "DLM marking index [{}] to be force merged for source index [{}]", - request.getIndexToBeForceMerged(), - request.getOriginalIndex() - ); - stepContext.client() - .projectClient(stepContext.projectId()) - .execute(MarkIndexForDLMForceMergeAction.TYPE, request, ActionListener.wrap(resp -> { - if (resp.isAcknowledged()) { - listener.onResponse(null); - } else { - listener.onFailure( - new ElasticsearchException( - Strings.format( - "DLM failed to acknowledge marking index [%s] to be force merged for source index [%s]", - request.getIndexToBeForceMerged(), - request.getOriginalIndex() - ) - ) - ); - } - }, listener::onFailure)); - } - - private static void maybeDeleteCloneIndex(DlmStepContext stepContext, ActionListener listener) { - String cloneIndex = getDLMCloneIndexName(stepContext.indexName()); - if (stepContext.projectState().metadata().indices().containsKey(cloneIndex) == false) { - logger.debug("Clone index [{}] does not exist, no need to delete", cloneIndex); - listener.onResponse(null); - return; - } - logger.debug("Attempting to delete index [{}]", cloneIndex); - - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(cloneIndex).indicesOptions(IGNORE_MISSING_OPTIONS) - .masterNodeTimeout(TimeValue.MAX_VALUE); - stepContext.executeDeduplicatedRequest( - TransportDeleteIndexAction.TYPE.name(), - deleteIndexRequest, - Strings.format("DLM service encountered an error trying to delete clone index [%s]", cloneIndex), - (req, unused) -> deleteCloneIndex(deleteIndexRequest, stepContext, listener) - ); - } - - private static void deleteCloneIndex(DeleteIndexRequest deleteIndexRequest, DlmStepContext stepContext, ActionListener listener) { - String cloneIndex = deleteIndexRequest.indices()[0]; - logger.debug("DLM issuing request to delete index [{}]", cloneIndex); - stepContext.client() - .projectClient(stepContext.projectId()) - .admin() - .indices() - .delete(deleteIndexRequest, ActionListener.wrap(resp -> { - if (resp.isAcknowledged()) { - logger.debug("DLM successfully deleted clone index [{}]", cloneIndex); - listener.onResponse(null); - } else { - listener.onFailure( - new ElasticsearchException(Strings.format("Failed to acknowledge delete of index [%s]", cloneIndex)) - ); - } - }, err -> { - if (err instanceof IndexNotFoundException) { - // If the index was not found, it means it was already deleted, so we can consider this a success - logger.debug("Clone index [{}] was not found during DLM delete attempt, it may have already been deleted", cloneIndex); - listener.onResponse(null); - return; - } - logger.error(() -> Strings.format("DLM failed to delete clone index [%s]", cloneIndex), err); - listener.onFailure(err); - })); - } - - /** - * Forms a resize request to clone the source index into a new index with 0 replicas. - * @param originalIndex the index to be cloned - * @param cloneIndex the name of the new index to clone into - * @return the resize request to clone the source index into a new index with 0 replicas - */ - public static ResizeRequest formCloneRequest(String originalIndex, String cloneIndex) { - CreateIndexRequest createReq = new CreateIndexRequest(cloneIndex); - createReq.waitForActiveShards(ActiveShardCount.ALL); - ResizeRequest resizeReq = new ResizeRequest( - MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, - AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, - ResizeType.CLONE, - originalIndex, - cloneIndex - ); - resizeReq.setTargetIndex(createReq); - resizeReq.setTargetIndexSettings( - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).putNull(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS) - ); - return resizeReq; - } - - /** - * Returns the name of index to be force merged by DLM from the custom metadata of the index metadata of the source index. - * If no such index has been marked in the custom metadata, returns null. - */ - @Nullable - private static String getIndexToBeForceMerged(String originalIndex, ProjectState projectState) { - return Optional.ofNullable(projectState.metadata().index(originalIndex)) - .map(indexMetadata -> indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) - .map(customMetadata -> customMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY)) - .orElse(null); - } - - /** - * Gets a prefixed name for the clone index based on the original index name - * - * @param originalName the original index name - * @return a prefixed clone index name - */ - public static String getDLMCloneIndexName(String originalName) { - return CLONE_INDEX_PREFIX + originalName; - } - - /** - * Gets the creation time of the clone index in milliseconds. If the clone index is part of a data stream, it attempts - * to get the creation time from the data stream's generation lifecycle date for the clone index. If that is not available, - * it falls back to the creation date of the clone index metadata. - * - * @param cloneIndex the name of the clone index - * @param cloneIndexMetadata the metadata of the clone index - * @param projectMetadata the project metadata containing data stream information - * @return the creation time in milliseconds - */ - protected static long getCloneIndexCreationTime(String cloneIndex, IndexMetadata cloneIndexMetadata, ProjectMetadata projectMetadata) { - return Optional.ofNullable(projectMetadata.getIndicesLookup()) - .map(indicesLookup -> indicesLookup.get(cloneIndex)) - .map(IndexAbstraction::getParentDataStream) - .map(dataStream -> dataStream.getGenerationLifecycleDate(cloneIndexMetadata)) - .map(TimeValue::millis) - .orElse(cloneIndexMetadata.getCreationDate()); - } - -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java deleted file mode 100644 index a50b636ec6228..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; -import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Strings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.lifecycle.ForceMergeRequestWrapper; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; - -import java.util.Arrays; -import java.util.Optional; -import java.util.stream.Collectors; - -/** - * A DLM step responsible for force merging the index. - */ -public class ForceMergeStep implements DlmStep { - - /** - * Index setting that indicates whether DLM force merge has been completed for this index. - */ - public static final Setting DLM_FORCE_MERGE_COMPLETE_SETTING = Setting.boolSetting( - "index.dlm.force_merge_complete", - false, - Setting.Property.Dynamic, - Setting.Property.IndexScope - ); - - private static final Settings FORCE_MERGE_COMPLETE_SETTINGS = Settings.builder() - .put(DLM_FORCE_MERGE_COMPLETE_SETTING.getKey(), true) - .build(); - private static final int SINGLE_SEGMENT = 1; - private static final Logger logger = LogManager.getLogger(ForceMergeStep.class); - - /** - * Determines if the step has been completed for the given index and project state. - * - * @param index The index to check. - * @param projectState The current project state. - * @return True if the step is completed, false otherwise. - */ - @Override - public boolean stepCompleted(Index index, ProjectState projectState) { - return isDLMForceMergeComplete(index, projectState); - } - - /** - * This method determines how to execute the step and performs the necessary operations to update the index - * so that {@link #stepCompleted(Index, ProjectState)} will return true after successful execution. - * - * @param stepContext The context and resources for executing the step. - */ - @Override - public void execute(DlmStepContext stepContext) { - maybeForceMerge(stepContext); - } - - /** - * Helper method to check if DLM force merge has been completed for the given index. - * This reads the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} from the index metadata. - * - * @param index The index to check. - * @param projectState The current project state. - * @return True if the force merge has been completed, false otherwise. - */ - protected boolean isDLMForceMergeComplete(Index index, ProjectState projectState) { - return Optional.ofNullable(projectState.metadata().index(index)) - .map(indexMetadata -> DLM_FORCE_MERGE_COMPLETE_SETTING.get(indexMetadata.getSettings())) - .orElse(false); - } - - /** - * Helper method to mark the index as having completed DLM force merge by updating the index setting. - * This writes the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} to the index metadata. - * - * @param stepContext The context containing the index and client for executing the update. - * @param listener The listener to notify upon completion or failure. - */ - protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListener listener) { - String indexName = stepContext.indexName(); - - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(FORCE_MERGE_COMPLETE_SETTINGS, indexName); - - String failureMessage = Strings.format( - "DLM service encountered an error trying to mark force merge as complete for index [%s]", - indexName - ); - - stepContext.executeDeduplicatedRequest( - TransportUpdateSettingsAction.TYPE.name(), - updateSettingsRequest, - failureMessage, - (req, unused) -> stepContext.client() - .projectClient(stepContext.projectId()) - .admin() - .indices() - .updateSettings(updateSettingsRequest, ActionListener.wrap(acknowledgedResponse -> { - if (acknowledgedResponse.isAcknowledged()) { - listener.onResponse(null); - } else { - listener.onFailure( - new ElasticsearchException( - Strings.format( - "Failed to mark force merge as complete for index [%s] because " - + "the update settings request was not acknowledged", - indexName - ) - ) - ); - } - }, listener::onFailure)) - ); - } - - /** - * Helper method to execute the force merge request for the given index. This method forms the request and uses the - * step context to execute it in a deduplicated manner. The actual execution of the force merge request is - * delegated to the {@link #forceMerge} method. Checks if the force merge has already been completed for the - * index before executing and skips execution if so. Also skips if the index does not exist in the project metadata. - */ - void maybeForceMerge(DlmStepContext stepContext) { - Index index = stepContext.index(); - boolean indexMissing = Optional.ofNullable(stepContext.projectState()) - .map(ProjectState::metadata) - .map(metadata -> metadata.index(index)) - .isEmpty(); - - if (indexMissing) { - logger.warn("Index [{}] not found in project metadata, skipping force merge step", index); - return; - } - - if (isDLMForceMergeComplete(stepContext.index(), stepContext.projectState())) { - logger.info("DLM force merge step is already completed for index [{}], skipping execution", stepContext.indexName()); - return; - } - - ForceMergeRequest forceMergeRequest = formForceMergeRequest(index.getName()); - stepContext.executeDeduplicatedRequest( - ForceMergeAction.NAME, - new ForceMergeRequestWrapper(forceMergeRequest), - Strings.format("DLM service encountered an error trying to force merge index [%s]", index), - (req, l) -> forceMerge(stepContext.projectId(), forceMergeRequest, l, stepContext) - ); - } - - /** - * This method executes the given force merge request. Once the request has completed successfully it updates - * the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} in the cluster state indicating that the force merge has completed. - * The listener is notified after the cluster state update has been made, or when the force merge fails or the - * update to the cluster state fails. - */ - protected void forceMerge( - ProjectId projectId, - ForceMergeRequest forceMergeRequest, - ActionListener listener, - DlmStepContext stepContext - ) { - assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time"; - - final String targetIndex = forceMergeRequest.indices()[0]; - logger.info("DLM is issuing a request to force merge index [{}] to a single segment", targetIndex); - stepContext.client() - .projectClient(projectId) - .admin() - .indices() - .forceMerge(forceMergeRequest, listener.delegateFailureAndWrap((l, forceMergeResponse) -> { - if (forceMergeResponse.getFailedShards() > 0) { - DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures(); - String message = Strings.format( - "DLM failed to force merge %d shards for index [%s] due to failures [%s]", - forceMergeResponse.getFailedShards(), - targetIndex, - failures == null - ? "unknown" - : Arrays.stream(failures).map(DefaultShardOperationFailedException::toString).collect(Collectors.joining(",")) - ); - l.onFailure(new ElasticsearchException(message)); - } else if (forceMergeResponse.getUnavailableShards() > 0) { - String message = Strings.format( - "DLM could not complete force merge for index [%s] because [%d] shards were unavailable." - + "This will be retried in the next cycle.", - targetIndex, - forceMergeResponse.getUnavailableShards() - ); - l.onFailure(new ElasticsearchException(message)); - } else { - logger.info("DLM successfully force merged index [{}]", targetIndex); - markDLMForceMergeComplete(stepContext, listener); - } - })); - } - - private ForceMergeRequest formForceMergeRequest(String index) { - ForceMergeRequest req = new ForceMergeRequest(index); - req.maxNumSegments(SINGLE_SEGMENT); - req.timeout(TimeValue.MAX_VALUE); - return req; - } - - /** - * A human-readable name for the step. - * - * @return The step name. - */ - @Override - public String stepName() { - return "Force Merge Index"; - } -} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStep.java deleted file mode 100644 index a03c1b8c54687..0000000000000 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStep.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; -import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; -import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.core.Strings; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static org.apache.logging.log4j.LogManager.getLogger; -import static org.elasticsearch.action.support.master.MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT; -import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; - -/** - * This step makes the index read-only when executed - */ -public class ReadOnlyStep implements DlmStep { - - private static final Logger logger = getLogger(ReadOnlyStep.class); - - @Override - public boolean stepCompleted(Index index, ProjectState projectState) { - return projectState.blocks().hasIndexBlock(projectState.projectId(), index.getName(), WRITE.getBlock()); - } - - @Override - public void execute(DlmStepContext stepContext) { - ProjectId projectId = stepContext.projectId(); - String indexName = stepContext.indexName(); - - AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(WRITE, indexName).masterNodeTimeout( - INFINITE_MASTER_NODE_TIMEOUT - ); - // Force a flush while adding the read-only block to ensure all in-flight writes are completed and written to segments - addIndexBlockRequest.markVerified(true); - - stepContext.executeDeduplicatedRequest( - TransportAddIndexBlockAction.TYPE.name(), - addIndexBlockRequest, - Strings.format("DLM service encountered an error trying to mark index [%s] as readonly", indexName), - (req, reqListener) -> addIndexBlock(projectId, addIndexBlockRequest, reqListener, stepContext) - ); - } - - private void addIndexBlock( - ProjectId projectId, - AddIndexBlockRequest addIndexBlockRequest, - ActionListener listener, - DlmStepContext stepContext - ) { - assert addIndexBlockRequest.indices() != null && addIndexBlockRequest.indices().length == 1 - : "DLM should update the index block for one index at a time"; - // "saving" the index name here so we don't capture the entire request - String targetIndex = addIndexBlockRequest.indices()[0]; - logger.trace("DLM issuing request to add block [{}] for index [{}]", addIndexBlockRequest.getBlock(), targetIndex); - stepContext.client() - .projectClient(projectId) - .admin() - .indices() - .addBlock( - addIndexBlockRequest, - new AddIndexBlockResponseActionListener(addIndexBlockRequest, targetIndex, listener, stepContext, projectId) - ); - } - - @Override - public String stepName() { - return "Make Index Read Only"; - } - - private static class AddIndexBlockResponseActionListener implements ActionListener { - private final AddIndexBlockRequest addIndexBlockRequest; - private final String targetIndex; - private final ActionListener listener; - private final DlmStepContext stepContext; - private final ProjectId projectId; - - private AddIndexBlockResponseActionListener( - AddIndexBlockRequest addIndexBlockRequest, - String targetIndex, - ActionListener listener, - DlmStepContext stepContext, - ProjectId projectId - ) { - this.addIndexBlockRequest = addIndexBlockRequest; - this.targetIndex = targetIndex; - this.listener = listener; - this.stepContext = stepContext; - this.projectId = projectId; - } - - @Override - public void onResponse(AddIndexBlockResponse addIndexBlockResponse) { - if (addIndexBlockResponse.isAcknowledged()) { - logger.info("DLM successfully added block [{}] for index [{}]", addIndexBlockRequest.getBlock(), targetIndex); - listener.onResponse(null); - } else { - Optional resultForTargetIndex = addIndexBlockResponse.getIndices() - .stream() - .filter(blockResult -> blockResult.getIndex().getName().equals(targetIndex)) - .findAny(); - if (resultForTargetIndex.isEmpty()) { - // This really should not happen but, if it does, mark as a fail and retry next DLM run - logger.trace( - "DLM received an unacknowledged response when attempting to add the " - + "read-only block to index [{}], but the response didn't contain an explicit result for the index.", - targetIndex - ); - listener.onFailure( - new ElasticsearchException("request to mark index [" + targetIndex + "] as read-only was not acknowledged") - ); - } else if (resultForTargetIndex.get().hasFailures()) { - AddIndexBlockResponse.AddBlockResult blockResult = resultForTargetIndex.get(); - if (blockResult.getException() != null) { - listener.onFailure(blockResult.getException()); - } else { - List shardFailures = new ArrayList<>( - blockResult.getShards().length - ); - for (AddIndexBlockResponse.AddBlockShardResult shard : blockResult.getShards()) { - if (shard.hasFailures()) { - shardFailures.addAll(Arrays.asList(shard.getFailures())); - } - } - assert shardFailures.isEmpty() == false - : "The block response must have shard failures as the global " - + "exception is null. The block result is: " - + blockResult; - String errorMessage = org.elasticsearch.common.Strings.collectionToDelimitedString( - shardFailures.stream().map(org.elasticsearch.common.Strings::toString).collect(Collectors.toList()), - "," - ); - listener.onFailure(new ElasticsearchException(errorMessage)); - } - } else { - listener.onFailure( - new ElasticsearchException("request to mark index [" + targetIndex + "] as read-only was not acknowledged") - ); - } - } - } - - @Override - public void onFailure(Exception e) { - if (e instanceof IndexNotFoundException) { - // index was already deleted, treat this as a success - logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex); - stepContext.errorStore().clearRecordedError(projectId, targetIndex); - listener.onResponse(null); - return; - } - - listener.onFailure(e); - } - } -} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsPluginTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsPluginTests.java deleted file mode 100644 index c173d5ee6c4ba..0000000000000 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsPluginTests.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.datastreams; - -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.DataStreamLifecycle; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.test.ESTestCase; - -import java.util.List; -import java.util.function.Function; - -import static org.hamcrest.Matchers.containsString; - -public class DataStreamsPluginTests extends ESTestCase { - - public void testVerifyActionsWithEmptyList() { - DataStreamsPlugin.verifyActions(List.of()); - } - - public void testVerifyActionsWithDefaultOutputPatterns() { - TestDlmStep step = new TestDlmStep("step-1"); - DlmAction action = new TestDlmAction("valid-action", List.of(step)); - DataStreamsPlugin.verifyActions(List.of(action)); - } - - public void testVerifyActionsWithSingleCustomOutputPattern() { - TestDlmStep step = new TestDlmStep("step-1"); - step.outputIndexNamePatternFunctions = List.of(name -> "cloned-" + name); - DlmAction action = new TestDlmAction("valid-action", List.of(step)); - DataStreamsPlugin.verifyActions(List.of(action)); - } - - public void testVerifyActionsWithMultipleCustomOutputPatterns() { - TestDlmStep step = new TestDlmStep("step-1"); - step.outputIndexNamePatternFunctions = List.of(name -> "partial-" + name, name -> "full-" + name); - DlmAction action = new TestDlmAction("valid-action", List.of(step)); - DataStreamsPlugin.verifyActions(List.of(action)); - } - - public void testVerifyActionsWithMixOfDefaultAndCustomOutputPatterns() { - TestDlmStep defaultStep = new TestDlmStep("default-step"); - TestDlmStep customStep = new TestDlmStep("custom-step"); - customStep.outputIndexNamePatternFunctions = List.of(name -> "cloned-" + name); - DlmAction action = new TestDlmAction("valid-action", List.of(defaultStep, customStep)); - DataStreamsPlugin.verifyActions(List.of(action)); - } - - public void testVerifyActionsWithMultipleValidActions() { - TestDlmStep step1 = new TestDlmStep("step-1"); - step1.outputIndexNamePatternFunctions = List.of(name -> "cloned-" + name); - DlmAction action1 = new TestDlmAction("action-1", List.of(step1, new TestDlmStep("step-2"))); - DlmAction action2 = new TestDlmAction("action-2", List.of(new TestDlmStep("step-3"))); - DataStreamsPlugin.verifyActions(List.of(action1, action2)); - } - - public void testVerifyActionsThrowsOnActionWithNoSteps() { - DlmAction action = new TestDlmAction("empty-action", List.of()); - IllegalStateException e = expectThrows(IllegalStateException.class, () -> DataStreamsPlugin.verifyActions(List.of(action))); - assertThat(e.getMessage(), containsString("DLM action [empty-action] must have at least one step")); - } - - public void testVerifyActionsThrowsOnStepWithEmptyOutputPatterns() { - TestDlmStep step = new TestDlmStep("bad-step"); - step.outputIndexNamePatternFunctions = List.of(); - DlmAction action = new TestDlmAction("my-action", List.of(step)); - - IllegalStateException e = expectThrows(IllegalStateException.class, () -> DataStreamsPlugin.verifyActions(List.of(action))); - assertThat(e.getMessage(), containsString("DLM step [bad-step] in action [my-action]")); - assertThat(e.getMessage(), containsString("must have at least one possible output index name pattern")); - } - - public void testVerifyActionsThrowsOnSecondActionWithNoSteps() { - DlmAction validAction = new TestDlmAction("valid-action", List.of(new TestDlmStep("step-1"))); - DlmAction emptyAction = new TestDlmAction("broken-action", List.of()); - IllegalStateException e = expectThrows( - IllegalStateException.class, - () -> DataStreamsPlugin.verifyActions(List.of(validAction, emptyAction)) - ); - assertThat(e.getMessage(), containsString("DLM action [broken-action] must have at least one step")); - } - - public void testVerifyActionsThrowsOnSecondStepWithEmptyOutputPatterns() { - TestDlmStep goodStep = new TestDlmStep("good-step"); - TestDlmStep badStep = new TestDlmStep("bad-step"); - badStep.outputIndexNamePatternFunctions = List.of(); - DlmAction action = new TestDlmAction("my-action", List.of(goodStep, badStep)); - - IllegalStateException e = expectThrows(IllegalStateException.class, () -> DataStreamsPlugin.verifyActions(List.of(action))); - assertThat(e.getMessage(), containsString("DLM step [bad-step] in action [my-action]")); - } - - private static class TestDlmStep implements DlmStep { - private final String name; - List> outputIndexNamePatternFunctions = null; - - TestDlmStep(String name) { - this.name = name; - } - - @Override - public boolean stepCompleted(Index index, ProjectState projectState) { - return false; - } - - @Override - public void execute(DlmStepContext dlmStepContext) {} - - @Override - public String stepName() { - return name; - } - - @Override - public List possibleOutputIndexNamePatterns(String indexName) { - if (outputIndexNamePatternFunctions != null) { - return outputIndexNamePatternFunctions.stream().map(func -> func.apply(indexName)).toList(); - } - return DlmStep.super.possibleOutputIndexNamePatterns(indexName); - } - } - - private static class TestDlmAction implements DlmAction { - private final String name; - private final List steps; - - TestDlmAction(String name, List steps) { - this.name = name; - this.steps = steps; - } - - @Override - public String name() { - return name; - } - - @Override - public Function applyAfterTime() { - return dsl -> null; - } - - @Override - public List steps() { - return steps; - } - - @Override - public boolean canRunOnProject(DlmActionContext dlmActionContext) { - return true; - } - } -} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 4e6200bce1b82..cdccbb62a8cfc 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.datastreams.lifecycle; -import org.apache.logging.log4j.Level; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -34,7 +33,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; -import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -71,10 +69,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; @@ -84,7 +78,6 @@ import org.elasticsearch.snapshots.EmptySnapshotsInfoService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; -import org.elasticsearch.test.MockLog; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.threadpool.TestThreadPool; @@ -98,9 +91,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -114,10 +105,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.ALL; -import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.BACKING_INDICES; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED; import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.SUCCESS; @@ -160,7 +148,6 @@ public class DataStreamLifecycleServiceTests extends ESTestCase { private final DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create( ClusterSettings.createBuiltInClusterSettings() ); - private List actions; @Before public void setupServices() { @@ -192,8 +179,6 @@ public void setupServices() { ); DataStreamLifecycleErrorStore errorStore = new DataStreamLifecycleErrorStore(() -> now); - actions = new ArrayList<>(); - dataStreamLifecycleService = new DataStreamLifecycleService( Settings.EMPTY, client, @@ -204,8 +189,7 @@ public void setupServices() { errorStore, allocationService, new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, client, clusterService, errorStore), - globalRetentionSettings, - actions + globalRetentionSettings ); clientWaitLatch = null; invokerWaitLatch = null; @@ -215,7 +199,6 @@ public void setupServices() { @After public void cleanup() { clientSeenRequests.clear(); - actions.clear(); dataStreamLifecycleService.close(); clusterService.close(); threadPool.shutdownNow(); @@ -1585,8 +1568,7 @@ public void testTrackingTimeStats() { errorStore, mock(AllocationService.class), new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, getTransportRequestsRecordingClient(), clusterService, errorStore), - globalRetentionSettings, - Collections.emptyList() + globalRetentionSettings ); assertThat(service.getLastRunDuration(), is(nullValue())); assertThat(service.getTimeBetweenStarts(), is(nullValue())); @@ -1912,637 +1894,6 @@ private interface DoExecuteDelegate { void doExecute(ActionType action, ActionRequest request, ActionListener listener); } - private static class TestDlmStep implements DlmStep { - boolean throwOnExecute = false; - boolean isCompleted = false; - int completedCheckCount = 0; - int executeCount = 0; - final Set executedIndices = new HashSet<>(); - List> outputIndexNamePatternFunctions = null; - - @Override - public boolean stepCompleted(Index index, ProjectState projectState) { - completedCheckCount++; - return isCompleted; - } - - @Override - public void execute(DlmStepContext dlmStepContext) { - executeCount++; - executedIndices.add(dlmStepContext.indexName()); - if (throwOnExecute) { - throw new RuntimeException("Test exception from DlmStep execute"); - } - } - - @Override - public String stepName() { - return "Test Step"; - } - - @Override - public List possibleOutputIndexNamePatterns(String indexName) { - if (outputIndexNamePatternFunctions != null) { - return outputIndexNamePatternFunctions.stream().map(func -> func.apply(indexName)).toList(); - } - return DlmStep.super.possibleOutputIndexNamePatterns(indexName); - } - } - - private static class TestDlmAction implements DlmAction { - private final List steps; - private final TimeValue schedule; - private boolean actionScheduleChecked = false; - private final boolean appliesFailureStore; - private final boolean canRunOnProjectResult; - - private TestDlmAction(TimeValue schedule, DlmStep... steps) { - this(schedule, false, true, steps); - } - - private TestDlmAction(TimeValue schedule, boolean appliesFailureStore, DlmStep... steps) { - this(schedule, appliesFailureStore, true, steps); - } - - private TestDlmAction(TimeValue schedule, boolean appliesFailureStore, boolean canRunOnProjectResult, DlmStep... steps) { - this.steps = Arrays.asList(steps); - this.schedule = schedule; - this.appliesFailureStore = appliesFailureStore; - this.canRunOnProjectResult = canRunOnProjectResult; - } - - @Override - public String name() { - return "Test DLM Action"; - } - - @Override - public List steps() { - return steps; - } - - @Override - public Function applyAfterTime() { - actionScheduleChecked = true; - return dsl -> schedule; - } - - @Override - public boolean appliesToFailureStore() { - return appliesFailureStore; - } - - @Override - public boolean canRunOnProject(DlmActionContext dlmActionContext) { - return canRunOnProjectResult; - } - } - - public void testUnscheduledTierTransition() { - TestDlmStep step1 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(null, step1); - - actions.add(action); - - HashSet indicesToExclude; - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle zeroRetentionDataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStreamLifecycle zeroRetentionFailuresLifecycle = DataStreamLifecycle.failuresLifecycleBuilder() - .dataRetention(TimeValue.ZERO) - .build(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - 2, - settings(IndexVersion.current()), - zeroRetentionDataLifecycle, - zeroRetentionFailuresLifecycle, - now - ); - builder.put(dataStream); - - indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(null, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - assertThat(step1.completedCheckCount, equalTo(0)); - assertThat(step1.executeCount, equalTo(0)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, empty()); - } - - public void testMaybeProcessDlmActionsNoEligibleIndices() { - TestDlmStep step = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueHours(1); - TestDlmAction action = new TestDlmAction(schedule, step); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - assertThat(step.completedCheckCount, equalTo(0)); - assertThat(step.executeCount, equalTo(0)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, empty()); - } - - public void testMaybeProcessDlmActionsEligibleIndicesExcluded() { - TestDlmStep step = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesEligible = new HashSet<>( - dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES) - ); - Set indicesToExclude = new HashSet<>(indicesEligible); - - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - assertThat(step.completedCheckCount, equalTo(0)); - assertThat(step.executeCount, equalTo(0)); - assertThat(indicesToExclude, equalTo(indicesEligible)); - assertThat(processedIndices, empty()); - } - - public void testMaybeProcessDlmActionsPartialIndicesExcluded() { - TestDlmStep step = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - List indicesEligible = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES); - - // Exclude only the first half of eligible indices - Set indicesToExclude = new HashSet<>(); - Set expectedExcludedIndices = new HashSet<>(); - for (int i = 0; i < indicesEligible.size() / 2; i++) { - Index index = indicesEligible.get(i); - indicesToExclude.add(index); - expectedExcludedIndices.add(index); - } - - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - - for (Index excludedIndex : expectedExcludedIndices) { - assertThat( - "Step should not be executed for excluded index: " + excludedIndex.getName(), - step.executedIndices.contains(excludedIndex.getName()), - is(false) - ); - } - - for (int i = indicesEligible.size() / 2; i < indicesEligible.size(); i++) { - Index nonExcludedIndex = indicesEligible.get(i); - assertThat( - "Step should be executed for non-excluded index: " + nonExcludedIndex.getName(), - step.executedIndices.contains(nonExcludedIndex.getName()), - is(true) - ); - } - - int expectedExecuteCount = indicesEligible.size() - expectedExcludedIndices.size(); - assertThat(step.executeCount, equalTo(expectedExecuteCount)); - assertThat(processedIndices, hasSize(indicesEligible.size() - indicesToExclude.size())); - } - - public void testMaybeProcessDlmActionsAllStepsCompleted() { - TestDlmStep step1 = new TestDlmStep(); - step1.isCompleted = true; - TestDlmStep step2 = new TestDlmStep(); - step2.isCompleted = true; - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step1, step2); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - int eligibleCount = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES).size(); - assertThat(step1.completedCheckCount, equalTo(0)); - assertThat(step2.completedCheckCount, equalTo(eligibleCount)); - assertThat(step1.executeCount, equalTo(0)); - assertThat(step2.executeCount, equalTo(0)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, empty()); - } - - public void testMaybeProcessDlmActionsOneStepCompleted() { - TestDlmStep step1 = new TestDlmStep(); - step1.isCompleted = true; - TestDlmStep step2 = new TestDlmStep(); - step2.isCompleted = false; - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step1, step2); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - int eligibleCount = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES).size(); - assertThat(step1.completedCheckCount, equalTo(eligibleCount)); - assertThat(step2.completedCheckCount, equalTo(eligibleCount)); - assertThat(step1.executeCount, equalTo(0)); - assertThat(step2.executeCount, equalTo(eligibleCount)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, hasSize(eligibleCount)); - } - - public void testMaybeProcessDlmActionsNoStepsCompleted() { - TestDlmStep step1 = new TestDlmStep(); - TestDlmStep step2 = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step1, step2); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - int eligibleCount = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES).size(); - assertThat(step1.completedCheckCount, equalTo(eligibleCount)); - assertThat(step2.completedCheckCount, equalTo(eligibleCount)); - assertThat(step1.executeCount, equalTo(eligibleCount)); - assertThat(step2.executeCount, equalTo(0)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, hasSize(eligibleCount)); - } - - public void testMaybeProcessDlmActionsStepExecutionThrows() { - TestDlmStep step1 = new TestDlmStep(); - TestDlmStep step2 = new TestDlmStep(); - step1.throwOnExecute = true; - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, step1, step2); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream(builder, dataStreamName, 3, 0, settings(IndexVersion.current()), dataLifecycle, null, now); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices; - - try (var mockLog = MockLog.capture(DataStreamLifecycleService.class)) { - mockLog.addExpectation( - new MockLog.SeenEventExpectation( - "step execution warning", - DataStreamLifecycleService.class.getCanonicalName(), - Level.WARN, - "Unable to execute step [Test Step] for action [Test DLM Action]" - ) - ); - - processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - mockLog.assertAllExpectationsMatched(); - } - - assertThat(action.actionScheduleChecked, equalTo(true)); - int eligibleCount = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES).size(); - assertThat(step1.completedCheckCount, equalTo(eligibleCount)); - assertThat(step2.completedCheckCount, equalTo(eligibleCount)); - assertThat(step1.executeCount, equalTo(eligibleCount)); - assertThat(step2.executeCount, equalTo(0)); - assertThat(indicesToExclude, empty()); - assertThat(processedIndices, empty()); - } - - public void testActionUsesAppliesToFailureStoreForBackingIndices() { - TestDlmStep step1 = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, false, step1); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - int numFailureIndices = 2; - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - numFailureIndices, - settings(IndexVersion.current()), - dataLifecycle, - null, - now - ); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - // When appliesToFailureStore is false, only backing indices should be processed - List backingIndicesEligible = dataStream.getIndicesOlderThan( - projectState.metadata()::index, - () -> now, - schedule, - BACKING_INDICES - ); - assertThat(processedIndices, hasSize(numBackingIndices - 1)); // all but the write index, no failure store indices - assertThat(processedIndices, hasSize(backingIndicesEligible.size())); - assertThat(step1.executeCount, equalTo(backingIndicesEligible.size())); - } - - public void testActionUsesAppliesToFailureStoreForFailureIndices() { - TestDlmStep step1 = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, true, step1); - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - int numFailureIndices = 2; - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - numFailureIndices, - settings(IndexVersion.current()), - dataLifecycle, - null, - now - ); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - // When appliesToFailureStore is true, failure indices should be included - List failureIndicesEligible = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, ALL); - // all but the write backing index, and write failure index - assertThat(processedIndices, hasSize(numBackingIndices + numFailureIndices - 2)); - assertThat(step1.executeCount, equalTo(failureIndicesEligible.size())); - assertThat(processedIndices, hasSize(failureIndicesEligible.size())); - } - - public void testMaybeProcessDlmActionsCanRunOnProjectReturnsTrue() { - TestDlmStep step1 = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, false, true, step1); // canRunOnProject returns true - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - 0, - settings(IndexVersion.current()), - dataLifecycle, - null, - now - ); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(true)); - // Action should be executed, so step1.executeCount should be > 0 - int eligibleCount = dataStream.getIndicesOlderThan(projectState.metadata()::index, () -> now, schedule, BACKING_INDICES).size(); - assertThat(step1.executeCount, equalTo(eligibleCount)); - assertThat(processedIndices, hasSize(eligibleCount)); - } - - public void testMaybeProcessDlmActionsCanRunOnProjectReturnsFalse() { - TestDlmStep step1 = new TestDlmStep(); - TimeValue schedule = TimeValue.timeValueMillis(1); - TestDlmAction action = new TestDlmAction(schedule, false, false, step1); // canRunOnProject returns false - actions.add(action); - - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - DataStreamLifecycle dataLifecycle = DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - 0, - settings(IndexVersion.current()), - dataLifecycle, - null, - now - ); - builder.put(dataStream); - ProjectState projectState = projectStateFromProject(builder); - - Set indicesToExclude = new HashSet<>(); - Set processedIndices = dataStreamLifecycleService.maybeProcessDlmActions(projectState, dataStream, indicesToExclude); - - assertThat(action.actionScheduleChecked, equalTo(false)); - // Action should NOT be executed, so step1.executeCount should be 0 - assertThat(step1.executeCount, equalTo(0)); - assertThat(processedIndices, empty()); - } - - public void testResolveIndexOutputFromPreviousStepExecutionFirstStep() { - TestDlmStep step1 = new TestDlmStep(); - TestDlmStep step2 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(TimeValue.timeValueMillis(1), step1, step2); - - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - IndexMetadata indexMetadata = IndexMetadata.builder("original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - builder.put(indexMetadata, false); - ProjectState projectState = projectStateFromProject(builder); - - Index originalIndex = indexMetadata.getIndex(); - Index result = dataStreamLifecycleService.resolveIndexOutputFromPreviousStep(0, originalIndex, action, projectState); - assertThat(result, is(originalIndex)); - } - - public void testResolveIndexForStepExecutionPreviousStepOutputMatchesExistingIndex() { - TestDlmStep step1 = new TestDlmStep(); - step1.outputIndexNamePatternFunctions = List.of(name -> "cloned-" + name); - TestDlmStep step2 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(TimeValue.timeValueMillis(1), step1, step2); - - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - IndexMetadata originalMeta = IndexMetadata.builder("original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - IndexMetadata clonedMeta = IndexMetadata.builder("cloned-original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - builder.put(originalMeta, false); - builder.put(clonedMeta, false); - ProjectState projectState = projectStateFromProject(builder); - - Index result = dataStreamLifecycleService.resolveIndexOutputFromPreviousStep(1, originalMeta.getIndex(), action, projectState); - assertThat(result, is(clonedMeta.getIndex())); - } - - public void testResolveIndexForStepExecutionNoMatchThrowsAssertionError() { - TestDlmStep step1 = new TestDlmStep(); - step1.outputIndexNamePatternFunctions = List.of(name -> "nonexistent-" + name); - TestDlmStep step2 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(TimeValue.timeValueMillis(1), step1, step2); - - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - IndexMetadata originalMeta = IndexMetadata.builder("original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - builder.put(originalMeta, false); - ProjectState projectState = projectStateFromProject(builder); - - Index originalIndex = originalMeta.getIndex(); - AssertionError e = expectThrows( - AssertionError.class, - () -> dataStreamLifecycleService.resolveIndexOutputFromPreviousStep(1, originalIndex, action, projectState) - ); - assertThat( - e.getMessage(), - is( - "Unable to resolve index name for executing step [Test Step] for action [Test DLM Action] with index [" - + originalIndex.getName() - + "]" - ) - ); - } - - public void testResolveIndexOutputFromPreviousStepMultiplePatternsFirstMatchWins() { - TestDlmStep step1 = new TestDlmStep(); - step1.outputIndexNamePatternFunctions = List.of(name -> "first-" + name, name -> "second-" + name); - TestDlmStep step2 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(TimeValue.timeValueMillis(1), step1, step2); - - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - IndexMetadata originalMeta = IndexMetadata.builder("original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - IndexMetadata firstMeta = IndexMetadata.builder("first-original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - IndexMetadata secondMeta = IndexMetadata.builder("second-original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - builder.put(originalMeta, false); - builder.put(firstMeta, false); - builder.put(secondMeta, false); - ProjectState projectState = projectStateFromProject(builder); - - Index result = dataStreamLifecycleService.resolveIndexOutputFromPreviousStep(1, originalMeta.getIndex(), action, projectState); - assertThat(result, is(firstMeta.getIndex())); - } - - public void testResolveIndexOutputFromPreviousStepSkipsNonMatchingPatternsUsesSecond() { - TestDlmStep step1 = new TestDlmStep(); - step1.outputIndexNamePatternFunctions = List.of(name -> "nonexistent-" + name, name -> "second-" + name); - TestDlmStep step2 = new TestDlmStep(); - TestDlmAction action = new TestDlmAction(TimeValue.timeValueMillis(1), step1, step2); - - ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); - IndexMetadata originalMeta = IndexMetadata.builder("original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - IndexMetadata secondMeta = IndexMetadata.builder("second-original-index") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - builder.put(originalMeta, false); - builder.put(secondMeta, false); - ProjectState projectState = projectStateFromProject(builder); - - Index result = dataStreamLifecycleService.resolveIndexOutputFromPreviousStep(1, originalMeta.getIndex(), action, projectState); - assertThat(result, is(secondMeta.getIndex())); - } - - public void testDlmStepDefaultPossibleOutputIndexNamePatterns() { - TestDlmStep step = new TestDlmStep(); - String testName = randomAlphaOfLength(10); - List patterns = step.possibleOutputIndexNamePatterns(testName); - assertThat(patterns, hasSize(1)); - assertThat(patterns.getFirst(), is(testName)); - } - public void testFormatExecutionTimeMilliseconds() { assertThat(DataStreamLifecycleService.formatExecutionTime(500), equalTo("500ms/500ms")); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java deleted file mode 100644 index 1908d747e5fca..0000000000000 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.ResultDeduplicator; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.project.TestProjectResolvers; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportRequest; -import org.junit.After; -import org.junit.Before; - -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.time.Instant; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.CloneStep.CLONE_INDEX_PREFIX; -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.CloneStep.formCloneRequest; -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.CloneStep.getDLMCloneIndexName; -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -public class CloneStepTests extends ESTestCase { - - private CloneStep cloneStep; - private ProjectId projectId; - private String indexName; - private Index index; - private ThreadPool threadPool; - private Client client; - private DataStreamLifecycleErrorStore errorStore; - private ResultDeduplicator, Void> deduplicator; - private AtomicReference> capturedCloneListener; - private AtomicReference> capturedDeleteListener; - private AtomicReference capturedResizeRequest; - private AtomicReference capturedDeleteRequest; - private AtomicReference capturedMarkRequest; - private Clock fixedClock; - - @Before - public void setup() { - threadPool = new TestThreadPool(getTestName()); - cloneStep = new CloneStep(); - projectId = randomProjectIdOrDefault(); - indexName = randomAlphaOfLength(10); - index = new Index(indexName, randomAlphaOfLength(10)); - errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); - deduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); - capturedCloneListener = new AtomicReference<>(); - capturedDeleteListener = new AtomicReference<>(); - capturedResizeRequest = new AtomicReference<>(); - capturedDeleteRequest = new AtomicReference<>(); - capturedMarkRequest = new AtomicReference<>(); - fixedClock = Clock.fixed(Instant.parse("2026-02-24T12:00:00Z"), Clock.systemDefaultZone().getZone()); - client = new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) { - @Override - @SuppressWarnings("unchecked") - protected void doExecute( - ActionType action, - Request request, - ActionListener listener - ) { - if (request instanceof ResizeRequest resizeRequest) { - capturedResizeRequest.set(resizeRequest); - capturedCloneListener.set((ActionListener) listener); - } else if (request instanceof DeleteIndexRequest deleteIndexRequest) { - capturedDeleteRequest.set(deleteIndexRequest); - capturedDeleteListener.set((ActionListener) listener); - } else if (request instanceof MarkIndexForDLMForceMergeAction.Request markRequest) { - capturedMarkRequest.set(markRequest); - } - } - }; - } - - @After - public void cleanup() { - terminate(threadPool); - } - - public void testStepName() { - assertThat(cloneStep.stepName(), equalTo("Clone Index")); - } - - public void testPossibleOutputIndexNamePatterns() { - List patterns = cloneStep.possibleOutputIndexNamePatterns(indexName); - assertThat(patterns, contains(getDLMCloneIndexName(indexName), indexName)); - } - - public void testStepNotCompletedWhenNoCloneIndexExists() { - ProjectState projectState = projectStateBuilder().build(); - assertFalse(cloneStep.stepCompleted(index, projectState)); - } - - public void testStepNotCompletedWhenCloneNotMarkedInMetadata() { - ProjectState projectState = projectStateBuilder().withClone().build(); - assertFalse(cloneStep.stepCompleted(index, projectState)); - } - - public void testStepCompletedWhenCloneExistsAndMarkedInMetadata() { - String cloneIndexName = getDLMCloneIndexName(indexName); - Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, cloneIndexName); - ProjectState projectState = projectStateBuilder().withClone().withCustomMetadata(customMetadata).withRouting().build(); - assertTrue(cloneStep.stepCompleted(index, projectState)); - } - - public void testStepNotCompletedWhenShardsNotActive() { - String cloneIndexName = getDLMCloneIndexName(indexName); - Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, cloneIndexName); - ProjectState projectState = projectStateBuilder().withClone().withCustomMetadata(customMetadata).withRouting(false).build(); - assertFalse(cloneStep.stepCompleted(index, projectState)); - } - - public void testStepCompletedWhenOriginalIndexMarkedWithZeroReplicas() { - Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, indexName); - ProjectState projectState = projectStateBuilder().withReplicas(0).withCustomMetadata(customMetadata).withRouting().build(); - assertTrue(cloneStep.stepCompleted(index, projectState)); - } - - public void testExecuteSkipsCloneWhenIndexHasZeroReplicas() { - ProjectState projectState = projectStateBuilder().withReplicas(0).build(); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - assertThat(capturedResizeRequest.get(), is(nullValue())); - - assertThat(capturedMarkRequest.get(), is(notNullValue())); - assertThat(capturedMarkRequest.get().getOriginalIndex(), equalTo(indexName)); - assertThat(capturedMarkRequest.get().getIndexToBeForceMerged(), equalTo(indexName)); - } - - public void testExecuteDeletesExistingCloneAndRetriesClone() { - // Create a clone index that was created more than 12 hours ago (stuck) - String cloneIndexName = getDLMCloneIndexName(indexName); - ProjectState projectState = setupStuckCloneScenario(13); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - // Should issue delete request for stuck clone - assertThat(capturedDeleteRequest.get(), is(notNullValue())); - assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); - } - - public void testExecuteCreatesCloneWithCorrectSettings() { - ProjectState projectState = projectStateBuilder().build(); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - assertThat(capturedResizeRequest.get(), is(notNullValue())); - assertThat(capturedResizeRequest.get().getSourceIndex(), equalTo(indexName)); - assertThat(capturedResizeRequest.get().getTargetIndexRequest().index(), containsString("dlm-clone-")); - assertThat(capturedResizeRequest.get().getTargetIndexRequest().settings().get("index.number_of_replicas"), equalTo("0")); - assertTrue(capturedResizeRequest.get().getTargetIndexRequest().settings().keySet().contains("index.auto_expand_replicas")); - assertNull(capturedResizeRequest.get().getTargetIndexRequest().settings().get("index.auto_expand_replicas")); - } - - public void testExecuteWithSuccessfulCloneResponse() { - ProjectState projectState = projectStateBuilder().build(); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - assertThat("clone listener should be captured", capturedCloneListener.get(), is(notNullValue())); - - String cloneIndexName = getDLMCloneIndexName(indexName); - CreateIndexResponse response = new CreateIndexResponse(true, true, cloneIndexName); - capturedCloneListener.get().onResponse(response); - - assertThat(capturedMarkRequest.get(), is(notNullValue())); - assertThat(capturedMarkRequest.get().getOriginalIndex(), equalTo(indexName)); - assertThat(capturedMarkRequest.get().getIndexToBeForceMerged(), equalTo(cloneIndexName)); - } - - public void testExecuteWithFailedCloneResponse() { - ProjectState projectState = projectStateBuilder().build(); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - ElasticsearchException exception = new ElasticsearchException("clone failed"); - capturedCloneListener.get().onFailure(exception); - - // Should NOT attempt to delete the clone index since it was never created in metadata - assertThat(capturedDeleteRequest.get(), is(nullValue())); - } - - public void testGetDLMCloneIndexName() { - String name = "test-index"; - String cloneName = getDLMCloneIndexName(name); - assertThat("Clone name should be deterministic", cloneName, equalTo(getDLMCloneIndexName(name))); - assertThat("Clone name should contain prefix", cloneName, containsString(CLONE_INDEX_PREFIX)); - int shortNameLength = cloneName.getBytes(StandardCharsets.UTF_8).length; - assertThat("Clone name should not exceed 255 bytes", shortNameLength <= 255, is(true)); - - String name1 = "index-1"; - String name2 = "index-2"; - assertThat( - "Different names should produce different clone names", - getDLMCloneIndexName(name1), - not(equalTo(getDLMCloneIndexName(name2))) - ); - } - - public void testDeleteCloneSuccessfully() { - ProjectState projectState = setupStuckCloneScenario(13); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - // Respond to delete request successfully - AcknowledgedResponse deleteResponse = AcknowledgedResponse.of(true); - capturedDeleteListener.get().onResponse(deleteResponse); - } - - public void testDeleteCloneWithFailure() { - ProjectState projectState = setupStuckCloneScenario(13); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - // Respond to delete request with failure - ElasticsearchException exception = new ElasticsearchException("delete failed"); - capturedDeleteListener.get().onFailure(exception); - } - - public void testExecuteWaitsWhenCloneIsInProgressAndNotTimedOut() { - // Create a clone index that was created less than 12 hours ago - ProjectState projectState = setupStuckCloneScenario(6); - DlmStepContext stepContext = createStepContext(projectState); - cloneStep.execute(stepContext); - - // Should NOT issue a delete request since it's still fresh - assertThat("Should not delete clone that is still within timeout", capturedDeleteRequest.get(), is(nullValue())); - // Should NOT issue a new clone request - assertThat("Should not create new clone request while one is in progress", capturedResizeRequest.get(), is(nullValue())); - } - - public void testExecuteDeletesCloneWhenStuckForOver12Hours() { - // Create a clone index that was created more than 12 hours ago - String cloneIndexName = getDLMCloneIndexName(indexName); - ProjectState projectState = setupStuckCloneScenario(13); - DlmStepContext stepContext = createStepContext(projectState); - cloneStep.execute(stepContext); - - // Should issue delete request for stuck clone - assertThat("Should delete clone that exceeded timeout", capturedDeleteRequest.get(), is(notNullValue())); - assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); - } - - public void testExecuteWaitsWhenCloneExistsButNotInDeduplicatorAndNotTimedOut() { - // Create a clone index that exists but is not in the deduplicator (completed but step not finished) - // and was created less than 12 hours ago - long creationTime = fixedClock.millis() - TimeValue.timeValueHours(2).millis(); // 2 hours ago - ProjectState projectState = projectStateBuilder().withClone().withCloneCreationTime(creationTime).build(); - DlmStepContext stepContext = createStepContext(projectState); - cloneStep.execute(stepContext); - // Should NOT issue a delete since it's not in the deduplicator (might be completing) - assertThat("Should not delete clone not in deduplicator", capturedDeleteRequest.get(), is(nullValue())); - // Should NOT issue a new clone request - assertThat("Should not create new clone while one exists", capturedResizeRequest.get(), is(nullValue())); - } - - public void testExecuteWaitsWhenCloneExistsOver12HoursButNotInDeduplicator() { - // Create a clone index that exists, is not in the deduplicator, and was created > 12 hours ago - // Since it's not in the deduplicator, but shards are not active, it should be deleted - long creationTime = fixedClock.millis() - TimeValue.timeValueHours(15).millis(); // 15 hours ago - ProjectState projectState = projectStateBuilder().withClone() - .withCloneCreationTime(creationTime) - .withCloneShardsNotActive() - .build(); - DlmStepContext stepContext = createStepContext(projectState); - cloneStep.execute(stepContext); - // Should delete - not in deduplicator but shards are not active and clone is old - assertThat( - "Should delete old clone if not in deduplicator and shards are not active", - capturedDeleteRequest.get(), - is(notNullValue()) - ); - } - - public void testExecuteCreatesNewCloneAfterTimeoutAndCleanup() { - // Test the full cycle: stuck clone gets deleted, then a new one is created on next run - String cloneIndexName = getDLMCloneIndexName(indexName); - ProjectState projectStateWithOldClone = setupStuckCloneScenario(14); - DlmStepContext stepContext = createStepContext(projectStateWithOldClone); - cloneStep.execute(stepContext); - - // First run: should delete the old clone - assertThat(capturedDeleteRequest.get(), is(notNullValue())); - assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); - - // Simulate successful delete - capturedDeleteListener.get().onResponse(AcknowledgedResponse.of(true)); - - // Reset captures and clear deduplicator to simulate that the old stuck request is no longer tracked - capturedDeleteRequest.set(null); - capturedResizeRequest.set(null); - deduplicator.clear(); - - // Second run: now without the clone index - ProjectState projectStateWithoutClone = projectStateBuilder().build(); - DlmStepContext stepContext2 = createStepContext(projectStateWithoutClone); - cloneStep.execute(stepContext2); - - // Should now create a new clone - assertThat("Should create new clone after old one was deleted", capturedResizeRequest.get(), is(notNullValue())); - assertThat(capturedResizeRequest.get().getSourceIndex(), equalTo(indexName)); - } - - public void testCloneFailureWithGenericExceptionRecordsError() { - ProjectState projectState = projectStateBuilder().build(); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - // Simulate clone failure with a generic exception - assertThat(capturedCloneListener.get(), is(notNullValue())); - ElasticsearchException cloneFailure = new ElasticsearchException("clone operation failed"); - capturedCloneListener.get().onFailure(cloneFailure); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("clone operation failed")); - } - - public void testStuckCloneCleanupFailureRecordsError() { - ProjectState projectState = setupStuckCloneScenario(13); - DlmStepContext stepContext = createStepContext(projectState); - - cloneStep.execute(stepContext); - - // Simulate failed cleanup of stuck clone - assertThat(capturedDeleteListener.get(), is(notNullValue())); - ElasticsearchException cleanupFailure = new ElasticsearchException("cleanup failed"); - capturedDeleteListener.get().onFailure(cleanupFailure); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("cleanup failed")); - } - - /** - * Builder for creating ProjectState with various configurations for testing. - */ - private class ProjectStateBuilder { - private int numberOfReplicas = 1; - private Map customMetadata = null; - private String cloneIndexName = null; - private Long cloneCreationTime = null; - private boolean withRouting = false; - private boolean allShardsActive = true; - - ProjectStateBuilder withReplicas(int numberOfReplicas) { - this.numberOfReplicas = numberOfReplicas; - return this; - } - - ProjectStateBuilder withCustomMetadata(Map customMetadata) { - this.customMetadata = customMetadata; - return this; - } - - ProjectStateBuilder withClone() { - this.cloneIndexName = getDLMCloneIndexName(indexName); - return this; - } - - ProjectStateBuilder withClone(String cloneName) { - this.cloneIndexName = cloneName; - return this; - } - - ProjectStateBuilder withCloneCreationTime(long creationTimeMillis) { - this.cloneCreationTime = creationTimeMillis; - return this; - } - - ProjectStateBuilder withRouting() { - this.withRouting = true; - return this; - } - - ProjectStateBuilder withRouting(boolean allShardsActive) { - this.withRouting = true; - this.allShardsActive = allShardsActive; - return this; - } - - ProjectStateBuilder withCloneShardsNotActive() { - this.allShardsActive = false; - this.withRouting = true; - return this; - } - - ProjectState build() { - // Build original index metadata - IndexMetadata.Builder originalIndexBuilder = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) - .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) - .build() - ) - .numberOfShards(1) - .numberOfReplicas(numberOfReplicas); - - if (customMetadata != null) { - originalIndexBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customMetadata); - } - - IndexMetadata originalIndexMetadata = originalIndexBuilder.build(); - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId).put(originalIndexMetadata, false); - - // Build clone index metadata if requested - IndexMetadata cloneIndexMetadata = null; - if (cloneIndexName != null) { - IndexMetadata.Builder cloneBuilder = IndexMetadata.builder(cloneIndexName) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) - .numberOfShards(1) - .numberOfReplicas(0); - - if (cloneCreationTime != null) { - cloneBuilder.creationDate(cloneCreationTime); - } - - cloneIndexMetadata = cloneBuilder.build(); - projectMetadataBuilder.put(cloneIndexMetadata, false); - - DataStream dataStream = DataStream.builder( - "test-datastream-" + indexName, - java.util.List.of(cloneIndexMetadata.getIndex(), originalIndexMetadata.getIndex()) - ).setGeneration(2).build(); - projectMetadataBuilder.put(dataStream); - } - - // Build routing table if requested - RoutingTable routingTable = null; - if (withRouting) { - // Route to the clone if it exists, otherwise to the original - Index indexToRoute = cloneIndexMetadata != null ? cloneIndexMetadata.getIndex() : originalIndexMetadata.getIndex(); - ShardRouting primaryShard = TestShardRouting.newShardRouting( - new ShardId(indexToRoute, 0), - "node1", - true, - allShardsActive ? ShardRoutingState.STARTED : ShardRoutingState.INITIALIZING - ); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexToRoute).addShard(primaryShard); - routingTable = RoutingTable.builder().add(indexRoutingTableBuilder).build(); - } - - // Build final ProjectState - ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadataBuilder); - if (routingTable != null) { - clusterStateBuilder.putRoutingTable(projectId, routingTable); - } - - return clusterStateBuilder.build().projectState(projectId); - } - } - - private ProjectStateBuilder projectStateBuilder() { - return new ProjectStateBuilder(); - } - - private DlmStepContext createStepContext(ProjectState projectState) { - return new DlmStepContext(index, projectState, deduplicator, errorStore, randomIntBetween(1, 10), client, fixedClock); - } - - /** - * Helper method to create a stuck clone scenario where: - * - A clone index exists with the specified age in hours - * - The clone request is registered in the deduplicator (simulating in-progress request) - * - * @param hoursAgo Number of hours ago the clone was created - * @return ProjectState with the stuck clone index - */ - private ProjectState setupStuckCloneScenario(int hoursAgo) { - long creationTime = fixedClock.millis() - TimeValue.timeValueHours(hoursAgo).millis(); - String cloneIndexName = getDLMCloneIndexName(indexName); - ProjectState projectState = projectStateBuilder().withClone().withCloneCreationTime(creationTime).build(); - ResizeRequest cloneRequest = formCloneRequest(indexName, cloneIndexName); - deduplicator.executeOnce(Tuple.tuple(projectId, cloneRequest), ActionListener.noop(), (req, listener) -> {}); - - return projectState; - } -} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java deleted file mode 100644 index eedca67ad96cb..0000000000000 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.ResultDeduplicator; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.project.TestProjectResolvers; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportRequest; -import org.junit.After; -import org.junit.Before; - -import java.time.Clock; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.datastreams.lifecycle.transitions.steps.ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -public class ForceMergeStepTests extends ESTestCase { - - private ForceMergeStep forceMergeStep; - private ProjectId projectId; - private String indexName; - private String indexUuid; - private Index index; - private ThreadPool threadPool; - private Client client; - private DataStreamLifecycleErrorStore errorStore; - private ResultDeduplicator, Void> deduplicator; - private AtomicReference> capturedListener; - private AtomicReference capturedRequest; - private AtomicReference> capturedForceMergeListener; - private AtomicReference capturedForceMergeRequest; - - @Before - public void setup() { - threadPool = new TestThreadPool(getTestName()); - forceMergeStep = new ForceMergeStep(); - projectId = randomProjectIdOrDefault(); - indexName = randomAlphaOfLength(10); - indexUuid = randomAlphaOfLength(10); - index = new Index(indexName, indexUuid); - errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); - deduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); - capturedListener = new AtomicReference<>(); - capturedRequest = new AtomicReference<>(); - capturedForceMergeListener = new AtomicReference<>(); - capturedForceMergeRequest = new AtomicReference<>(); - - client = new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) { - @Override - @SuppressWarnings("unchecked") - protected void doExecute( - ActionType action, - Request request, - ActionListener listener - ) { - if (request instanceof UpdateSettingsRequest) { - capturedRequest.set((UpdateSettingsRequest) request); - capturedListener.set((ActionListener) listener); - } else if (request instanceof ForceMergeRequest) { - capturedForceMergeRequest.set((ForceMergeRequest) request); - capturedForceMergeListener.set((ActionListener) listener); - } - } - }; - } - - @After - public void tearDown() throws Exception { - terminate(threadPool); - super.tearDown(); - } - - public void testStepCompletedWhenForceMergeSettingIsTrue() { - ProjectState projectState = createProjectStateWithSetting(true); - assertTrue(forceMergeStep.stepCompleted(index, projectState)); - } - - public void testStepNotCompletedWhenForceMergeSettingIsFalse() { - ProjectState projectState = createProjectStateWithSetting(false); - assertFalse(forceMergeStep.stepCompleted(index, projectState)); - } - - public void testStepNotCompletedWhenForceMergeSettingIsAbsent() { - ProjectState projectState = createProjectState(); - assertFalse(forceMergeStep.stepCompleted(index, projectState)); - } - - public void testMarkDLMForceMergeCompleteHappyCase() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - forceMergeStep.markDLMForceMergeComplete(stepContext, ActionListener.noop()); - - assertThat(capturedRequest.get(), is(notNullValue())); - assertThat(capturedRequest.get().indices(), is(notNullValue())); - assertThat(capturedRequest.get().indices().length, is(1)); - assertThat(capturedRequest.get().indices()[0], is(indexName)); - - Settings settings = capturedRequest.get().settings(); - assertThat(DLM_FORCE_MERGE_COMPLETE_SETTING.get(settings), is(true)); - } - - public void testMaybeForceMergeSubmitsForceMergeRequest() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - forceMergeStep.maybeForceMerge(stepContext); - - assertThat(capturedForceMergeRequest.get(), is(notNullValue())); - assertThat(capturedForceMergeRequest.get().indices().length, is(1)); - assertThat(capturedForceMergeRequest.get().indices()[0], is(indexName)); - assertThat(capturedForceMergeRequest.get().maxNumSegments(), is(1)); - } - - public void testMaybeForceMergeSkipsWhenIndexNotInMetadata() { - ProjectState projectState = buildProjectState(null); - DlmStepContext stepContext = createStepContext(projectState); - - forceMergeStep.maybeForceMerge(stepContext); - - assertThat(capturedForceMergeRequest.get(), is(nullValue())); - } - - public void testMaybeForceMergeSkipsWhenAlreadyComplete() { - ProjectState projectState = createProjectStateWithSetting(true); - DlmStepContext stepContext = createStepContext(projectState); - - forceMergeStep.maybeForceMerge(stepContext); - - assertThat(capturedForceMergeRequest.get(), is(nullValue())); - } - - public void testMaybeForceMergeSuccessClearsErrorRecord() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - // Pre-populate the error store so we can verify it gets cleared on success - errorStore.recordError(projectId, indexName, new RuntimeException("previous error")); - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - - forceMergeStep.maybeForceMerge(stepContext); - - BroadcastResponse response = new BroadcastResponse(1, 1, 0, List.of()); - capturedForceMergeListener.get().onResponse(response); - - // After the force merge succeeds, forceMerge() calls markDLMForceMergeComplete() which submits - // an UpdateSettingsRequest. The deduplicator's ErrorRecordingActionListener only fires once - // that nested request completes, so we must complete it here. - assertThat(capturedListener.get(), is(notNullValue())); - capturedListener.get().onResponse(AcknowledgedResponse.TRUE); - - // ErrorRecordingActionListener.onResponse clears the error record - assertThat(errorStore.getError(projectId, indexName), is(nullValue())); - } - - public void testMaybeForceMergeRecordsErrorOnListenerFailure() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - forceMergeStep.maybeForceMerge(stepContext); - - RuntimeException failure = new RuntimeException("force merge transport failure"); - capturedForceMergeListener.get().onFailure(failure); - - // The deduplicator's ErrorRecordingActionListener should have stored the error - var errorRecord = errorStore.getError(projectId, indexName); - assertNotNull(errorRecord); - assertThat(errorRecord.error(), containsString("force merge transport failure")); - } - - public void testForceMergeFailsWhenShardsHaveFailures() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); - - AtomicReference capturedFailure = new AtomicReference<>(); - forceMergeStep.forceMerge(projectId, forceMergeRequest, ActionListener.wrap(v -> { - throw new AssertionError("expected failure but got success"); - }, capturedFailure::set), stepContext); - - DefaultShardOperationFailedException shardFailure = new DefaultShardOperationFailedException( - indexName, - 0, - new IllegalStateException("shard merge failed") - ); - BroadcastResponse response = new BroadcastResponse(1, 0, 1, List.of(shardFailure)); - capturedForceMergeListener.get().onResponse(response); - - assertThat(capturedFailure.get(), is(notNullValue())); - assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); - assertThat(capturedFailure.get().getMessage(), containsString(indexName)); - assertThat(capturedFailure.get().getMessage(), containsString("DLM failed to force merge")); - } - - public void testForceMergeFailsWhenShardsPartiallySuccessful() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); - AtomicReference capturedFailure = new AtomicReference<>(); - forceMergeStep.forceMerge(projectId, forceMergeRequest, ActionListener.wrap(v -> { - throw new AssertionError("expected failure but got success"); - }, capturedFailure::set), stepContext); - BroadcastResponse response = new BroadcastResponse(5, 3, 0, List.of()); - capturedForceMergeListener.get().onResponse(response); - assertThat(capturedFailure.get(), is(notNullValue())); - assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); - assertThat(capturedFailure.get().getMessage(), containsString(indexName)); - assertThat(capturedFailure.get().getMessage(), containsString("shards were unavailable")); - } - - private ProjectState createProjectState() { - return buildProjectState(Settings.EMPTY); - } - - private ProjectState createProjectStateWithSetting(boolean forceMergeComplete) { - return buildProjectState(Settings.builder().put(DLM_FORCE_MERGE_COMPLETE_SETTING.getKey(), forceMergeComplete).build()); - } - - /** - * Builds a {@link ProjectState} for the test index. Pass {@code null} to omit the index from metadata entirely. - */ - private ProjectState buildProjectState(@Nullable Settings indexSettings) { - ProjectMetadata.Builder projectMetadata = ProjectMetadata.builder(projectId); - if (indexSettings != null) { - projectMetadata.put(buildIndexMetadata(indexSettings), false); - } - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadata.build()).build(); - return clusterState.projectState(projectId); - } - - private IndexMetadata buildIndexMetadata(Settings additionalSettings) { - return IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) - .put(IndexMetadata.SETTING_INDEX_UUID, indexUuid) - .put(additionalSettings) - .build() - ) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - } - - private DlmStepContext createStepContext(ProjectState projectState) { - return new DlmStepContext(index, projectState, deduplicator, errorStore, randomIntBetween(1, 10), client, Clock.systemUTC()); - } -} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStepTests.java deleted file mode 100644 index a60ba2f82e545..0000000000000 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ReadOnlyStepTests.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.datastreams.lifecycle.transitions.steps; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.ResultDeduplicator; -import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; -import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.project.TestProjectResolvers; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; -import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportRequest; -import org.junit.After; -import org.junit.Before; - -import java.time.Clock; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -public class ReadOnlyStepTests extends ESTestCase { - - private ReadOnlyStep readOnlyStep; - private ProjectId projectId; - private String indexName; - private Index index; - private ThreadPool threadPool; - private Client client; - private DataStreamLifecycleErrorStore errorStore; - private ResultDeduplicator, Void> deduplicator; - private AtomicReference> capturedListener; - private AtomicReference capturedRequest; - - @Before - public void setup() { - threadPool = new TestThreadPool(getTestName()); - readOnlyStep = new ReadOnlyStep(); - projectId = randomProjectIdOrDefault(); - indexName = randomAlphaOfLength(10); - index = new Index(indexName, randomAlphaOfLength(10)); - errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); - deduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); - capturedListener = new AtomicReference<>(); - capturedRequest = new AtomicReference<>(); - - client = new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) { - @Override - @SuppressWarnings("unchecked") - protected void doExecute( - ActionType action, - Request request, - ActionListener listener - ) { - if (request instanceof AddIndexBlockRequest) { - capturedRequest.set((AddIndexBlockRequest) request); - capturedListener.set((ActionListener) listener); - } - } - }; - } - - @After - public void cleanup() { - threadPool.shutdownNow(); - } - - public void testStepCompletedWhenIndexHasWriteBlock() { - ClusterBlock writeBlock = WRITE.getBlock(); - - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId) - .put( - IndexMetadata.builder(indexName) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) - .numberOfShards(1) - .numberOfReplicas(0) - .build(), - false - ); - - ClusterBlocks clusterBlocks = ClusterBlocks.builder().addIndexBlock(projectId, indexName, writeBlock).build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(projectMetadataBuilder) - .blocks(clusterBlocks) - .build(); - ProjectState projectState = clusterState.projectState(projectId); - - assertTrue(readOnlyStep.stepCompleted(index, projectState)); - } - - public void testStepNotCompletedWhenIndexHasNoWriteBlock() { - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId) - .put( - IndexMetadata.builder(indexName) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) - .numberOfShards(1) - .numberOfReplicas(0) - .build(), - false - ); - - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadataBuilder).build(); - ProjectState projectState = clusterState.projectState(projectId); - - assertFalse(readOnlyStep.stepCompleted(index, projectState)); - } - - public void testExecuteCallsAddBlockWithCorrectParameters() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - assertThat(capturedListener.get(), is(notNullValue())); - - // Simulate successful response - AddIndexBlockResponse response = new AddIndexBlockResponse(true, true, List.of(new AddIndexBlockResponse.AddBlockResult(index))); - capturedListener.get().onResponse(response); - - // Error should be empty - assertThat(errorStore.getError(projectId, indexName), is(nullValue())); - } - - public void testExecuteWithAcknowledgedResponseClearsError() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - // Pre-populate error store - errorStore.recordError(projectId, indexName, new RuntimeException("previous error")); - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - - readOnlyStep.execute(stepContext); - - AddIndexBlockResponse response = new AddIndexBlockResponse(true, true, List.of(new AddIndexBlockResponse.AddBlockResult(index))); - capturedListener.get().onResponse(response); - - // Error should be cleared - assertThat(errorStore.getError(projectId, indexName), is(nullValue())); - } - - public void testExecuteWithUnacknowledgedResponseRecordsError() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - // Unacknowledged response without index result - AddIndexBlockResponse response = new AddIndexBlockResponse(false, false, Collections.emptyList()); - capturedListener.get().onResponse(response); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("not acknowledged")); - } - - public void testExecuteWithUnacknowledgedResponseWithIndexResult() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - // Unacknowledged response with index result but no failures - AddIndexBlockResponse response = new AddIndexBlockResponse(false, false, List.of(new AddIndexBlockResponse.AddBlockResult(index))); - capturedListener.get().onResponse(response); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("not acknowledged")); - } - - public void testExecuteWithGlobalExceptionInBlockResult() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - ElasticsearchException exception = new ElasticsearchException("global failure"); - AddIndexBlockResponse response = new AddIndexBlockResponse( - false, - false, - List.of(new AddIndexBlockResponse.AddBlockResult(index, exception)) - ); - capturedListener.get().onResponse(response); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("global failure")); - } - - public void testExecuteWithShardFailuresInBlockResult() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - AddIndexBlockResponse.AddBlockShardResult.Failure shardFailure = new AddIndexBlockResponse.AddBlockShardResult.Failure( - indexName, - 0, - new ElasticsearchException("shard failure") - ); - - AddIndexBlockResponse.AddBlockShardResult[] shardResults = new AddIndexBlockResponse.AddBlockShardResult[] { - new AddIndexBlockResponse.AddBlockShardResult(0, new AddIndexBlockResponse.AddBlockShardResult.Failure[] { shardFailure }) }; - - AddIndexBlockResponse response = new AddIndexBlockResponse( - false, - false, - List.of(new AddIndexBlockResponse.AddBlockResult(index, shardResults)) - ); - capturedListener.get().onResponse(response); - - // Error should be recorded with shard failure details - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("shard failure")); - } - - public void testExecuteWithIndexNotFoundExceptionClearsError() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - // Pre-populate error store - errorStore.recordError(projectId, indexName, new RuntimeException("previous error")); - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - - readOnlyStep.execute(stepContext); - - capturedListener.get().onFailure(new IndexNotFoundException(indexName)); - - // Error should be cleared since index was deleted - assertThat(errorStore.getError(projectId, indexName), is(nullValue())); - } - - public void testExecuteWithGenericFailureRecordsError() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - capturedListener.get().onFailure(new ElasticsearchException("some error")); - - // Error should be recorded - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("some error")); - } - - public void testStepCompletedWithDifferentBlocks() { - // Test that only WRITE block makes the step completed - ClusterBlock readBlock = IndexMetadata.APIBlock.READ.getBlock(); - - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId) - .put( - IndexMetadata.builder(indexName) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) - .numberOfShards(1) - .numberOfReplicas(0) - .build(), - false - ); - - ClusterBlocks clusterBlocks = ClusterBlocks.builder().addIndexBlock(projectId, indexName, readBlock).build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(projectMetadataBuilder) - .blocks(clusterBlocks) - .build(); - ProjectState projectState = clusterState.projectState(projectId); - - // READ block should not make the step completed, only WRITE block should - assertFalse(readOnlyStep.stepCompleted(index, projectState)); - } - - public void testExecuteWithMultipleShardFailures() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - AddIndexBlockResponse.AddBlockShardResult.Failure shardFailure1 = new AddIndexBlockResponse.AddBlockShardResult.Failure( - indexName, - 0, - new ElasticsearchException("shard 0 failure") - ); - - AddIndexBlockResponse.AddBlockShardResult.Failure shardFailure2 = new AddIndexBlockResponse.AddBlockShardResult.Failure( - indexName, - 1, - new ElasticsearchException("shard 1 failure") - ); - - AddIndexBlockResponse.AddBlockShardResult[] shardResults = new AddIndexBlockResponse.AddBlockShardResult[] { - new AddIndexBlockResponse.AddBlockShardResult(0, new AddIndexBlockResponse.AddBlockShardResult.Failure[] { shardFailure1 }), - new AddIndexBlockResponse.AddBlockShardResult(1, new AddIndexBlockResponse.AddBlockShardResult.Failure[] { shardFailure2 }) }; - - AddIndexBlockResponse response = new AddIndexBlockResponse( - false, - false, - List.of(new AddIndexBlockResponse.AddBlockResult(index, shardResults)) - ); - capturedListener.get().onResponse(response); - - // Error should be recorded with both shard failures - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - String errorMessage = Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(); - assertThat(errorMessage, containsString("shard 0 failure")); - assertThat(errorMessage, containsString("shard 1 failure")); - } - - public void testExecuteWithShardResultsButNoFailures() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - // Create shard results with no failures (empty failure arrays) - AddIndexBlockResponse.AddBlockShardResult[] shardResults = new AddIndexBlockResponse.AddBlockShardResult[] { - new AddIndexBlockResponse.AddBlockShardResult(0, new AddIndexBlockResponse.AddBlockShardResult.Failure[0]), - new AddIndexBlockResponse.AddBlockShardResult(1, new AddIndexBlockResponse.AddBlockShardResult.Failure[0]) }; - - // Response is not acknowledged, has index result with shards, but no failures - AddIndexBlockResponse response = new AddIndexBlockResponse( - false, - false, - List.of(new AddIndexBlockResponse.AddBlockResult(index, shardResults)) - ); - capturedListener.get().onResponse(response); - - // Error should be recorded because response was not acknowledged - assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("not acknowledged")); - } - - public void testAddIndexBlockRequestHasVerifiedSetToTrue() { - ProjectState projectState = createProjectState(); - DlmStepContext stepContext = createStepContext(projectState); - - readOnlyStep.execute(stepContext); - - assertThat(capturedRequest.get(), is(notNullValue())); - assertTrue("AddIndexBlockRequest should have verified set to true", capturedRequest.get().markVerified()); - } - - private ProjectState createProjectState() { - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId) - .put( - IndexMetadata.builder(indexName) - .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) - .numberOfShards(1) - .numberOfReplicas(0) - .build(), - false - ); - - ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadataBuilder); - - return clusterStateBuilder.build().projectState(projectId); - } - - private DlmStepContext createStepContext(ProjectState projectState) { - return new DlmStepContext(index, projectState, deduplicator, errorStore, randomIntBetween(1, 10), client, Clock.systemUTC()); - } -}