From 31d6c0d7d4a2c095453d3f3b46a54f18407b851a Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Thu, 26 Feb 2026 23:00:17 -0500 Subject: [PATCH 01/16] Add DLM force merge operation helpers --- .../lifecycle/DataStreamLifecycleService.java | 4 +- .../transitions/steps/ForceMergeStep.java | 60 +++++++++++++ .../steps/ForceMergeStepTests.java | 86 ++++++++++++++++++- 3 files changed, 146 insertions(+), 4 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 5edf99afd83ad..3765246474361 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -1817,8 +1817,8 @@ public void markIndexForDlmForceMerge( * care about it for data stream lifecycle deduplication. This class is non-private for the sake of unit testing, but should not be used * outside of Data Stream Lifecycle Service. */ - static final class ForceMergeRequestWrapper extends ForceMergeRequest { - ForceMergeRequestWrapper(ForceMergeRequest original) { + public static final class ForceMergeRequestWrapper extends ForceMergeRequest { + public ForceMergeRequestWrapper(ForceMergeRequest original) { super(original.indices()); this.maxNumSegments(original.maxNumSegments()); this.onlyExpungeDeletes(original.onlyExpungeDeletes()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 65279afd2af89..bf7996b1cd71c 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -9,18 +9,26 @@ package org.elasticsearch.datastreams.lifecycle.transitions.steps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; import org.elasticsearch.index.Index; +import java.util.Arrays; import java.util.Optional; /** @@ -41,6 +49,8 @@ public class ForceMergeStep implements DlmStep { private static final Settings FORCE_MERGE_COMPLETE_SETTINGS = Settings.builder() .put(DLM_FORCE_MERGE_COMPLETE_SETTING.getKey(), true) .build(); + private static final int SINGLE_SEGMENT = 1; + private static final Logger logger = LogManager.getLogger(ForceMergeStep.class); /** * Determines if the step has been completed for the given index and project state. @@ -122,6 +132,56 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe ); } + void maybeForceMerge(String index, DlmStepContext stepContext) { + ForceMergeRequest forceMergeRequest = formForceMergeRequest(index); + stepContext.executeDeduplicatedRequest( + ForceMergeAction.NAME, + forceMergeRequest, + Strings.format("DLM service encountered an error trying to force merge index [%s]", index), + (req, l) -> forceMerge(stepContext.projectId(), forceMergeRequest, l, stepContext) + ); + } + + protected void forceMerge( + ProjectId projectId, + ForceMergeRequest forceMergeRequest, + ActionListener listener, + DlmStepContext stepContext + ) { + logger.debug("DLM attempting to force merge index [{}]", stepContext.indexName()); + stepContext.client() + .projectClient(projectId) + .admin() + .indices() + .forceMerge(forceMergeRequest, listener.delegateFailureAndWrap((l, response) -> { + if (response.getFailedShards() == 0) { + logger.debug("DLM successfully force merged index [{}]", stepContext.indexName()); + l.onResponse(null); + } else { + DefaultShardOperationFailedException[] failures = response.getShardFailures(); + String errorMessage = Strings.format( + "DLM failed while force merging index [%s] with the following failures: [%s]", + stepContext.indexName(), + failures == null + ? "n/a" + : org.elasticsearch.common.Strings.collectionToDelimitedString( + Arrays.stream(failures).map(org.elasticsearch.common.Strings::toString).toList(), + "," + ) + ); + logger.warn(errorMessage); + ElasticsearchException e = new ElasticsearchException(errorMessage); + l.onFailure(e); + } + })); + } + + private ForceMergeRequest formForceMergeRequest(String index) { + ForceMergeRequest req = new ForceMergeRequest(index); + req.maxNumSegments(SINGLE_SEGMENT); + return new DataStreamLifecycleService.ForceMergeRequestWrapper(req); + } + /** * A human-readable name for the step. * diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index edf3012059b8e..67dad79385241 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -9,12 +9,16 @@ package org.elasticsearch.datastreams.lifecycle.transitions.steps; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; @@ -37,13 +41,17 @@ import org.elasticsearch.transport.TransportRequest; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import java.time.Clock; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.datastreams.lifecycle.transitions.steps.ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class ForceMergeStepTests extends ESTestCase { @@ -58,6 +66,8 @@ public class ForceMergeStepTests extends ESTestCase { private ResultDeduplicator, Void> deduplicator; private AtomicReference> capturedListener; private AtomicReference capturedRequest; + private AtomicReference> capturedForceMergeListener; + private AtomicReference capturedForceMergeRequest; @Before public void setup() { @@ -71,6 +81,8 @@ public void setup() { deduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); capturedListener = new AtomicReference<>(); capturedRequest = new AtomicReference<>(); + capturedForceMergeListener = new AtomicReference<>(); + capturedForceMergeRequest = new AtomicReference<>(); client = new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) { @Override @@ -83,6 +95,9 @@ protected void if (request instanceof UpdateSettingsRequest) { capturedRequest.set((UpdateSettingsRequest) request); capturedListener.set((ActionListener) listener); + } else if (request instanceof ForceMergeRequest) { + capturedForceMergeRequest.set((ForceMergeRequest) request); + capturedForceMergeListener.set((ActionListener) listener); } } }; @@ -124,8 +139,75 @@ public void testMarkDLMForceMergeCompleteHappyCase() { assertThat(DLM_FORCE_MERGE_COMPLETE_SETTING.get(settings), is(true)); } - public void testStepName() { - assertThat(forceMergeStep.stepName(), is("Force Merge Index")); + public void testMaybeForceMergeSubmitsForceMergeRequest() { + ProjectState projectState = createProjectState(); + DlmStepContext stepContext = createStepContext(projectState); + + forceMergeStep.maybeForceMerge(indexName, stepContext); + + assertThat(capturedForceMergeRequest.get(), is(notNullValue())); + assertThat(capturedForceMergeRequest.get().indices().length, is(1)); + assertThat(capturedForceMergeRequest.get().indices()[0], is(indexName)); + assertThat(capturedForceMergeRequest.get().maxNumSegments(), is(1)); + } + + public void testMaybeForceMergeSuccessClearsErrorRecord() { + ProjectState projectState = createProjectState(); + DlmStepContext stepContext = createStepContext(projectState); + + // Pre-populate the error store so we can verify it gets cleared on success + errorStore.recordError(projectId, indexName, new RuntimeException("previous error")); + assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); + + forceMergeStep.maybeForceMerge(indexName, stepContext); + + BroadcastResponse response = Mockito.mock(BroadcastResponse.class); + Mockito.when(response.getFailedShards()).thenReturn(0); + capturedForceMergeListener.get().onResponse(response); + + // ErrorRecordingActionListener.onResponse clears the error record + assertThat(errorStore.getError(projectId, indexName), is(nullValue())); + } + + public void testMaybeForceMergeRecordsErrorOnListenerFailure() { + ProjectState projectState = createProjectState(); + DlmStepContext stepContext = createStepContext(projectState); + + forceMergeStep.maybeForceMerge(indexName, stepContext); + + RuntimeException failure = new RuntimeException("force merge transport failure"); + capturedForceMergeListener.get().onFailure(failure); + + // The deduplicator's ErrorRecordingActionListener should have stored the error + var errorRecord = errorStore.getError(projectId, indexName); + assertNotNull(errorRecord); + assertThat(errorRecord.error(), containsString("force merge transport failure")); + } + + public void testForceMergeFailsWhenShardsHaveFailures() { + ProjectState projectState = createProjectState(); + DlmStepContext stepContext = createStepContext(projectState); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); + + AtomicReference capturedFailure = new AtomicReference<>(); + forceMergeStep.forceMerge(projectId, forceMergeRequest, ActionListener.wrap(v -> { + throw new AssertionError("expected failure but got success"); + }, capturedFailure::set), stepContext); + + DefaultShardOperationFailedException shardFailure = new DefaultShardOperationFailedException( + indexName, + 0, + new IllegalStateException("shard merge failed") + ); + BroadcastResponse response = Mockito.mock(BroadcastResponse.class); + Mockito.when(response.getFailedShards()).thenReturn(1); + Mockito.when(response.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[] { shardFailure }); + capturedForceMergeListener.get().onResponse(response); + + assertThat(capturedFailure.get(), is(notNullValue())); + assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); + assertThat(capturedFailure.get().getMessage(), containsString(indexName)); + assertThat(capturedFailure.get().getMessage(), containsString("DLM failed while force merging")); } private ProjectState createProjectState() { From 2731ae7852e8a1ef6f7156097233d4bcbeab4ade Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Thu, 26 Feb 2026 23:32:04 -0500 Subject: [PATCH 02/16] adapt data lifecycleservice force merge code --- .../transitions/steps/ForceMergeStep.java | 53 ++++++++++++------- .../steps/ForceMergeStepTests.java | 28 ++++++++++ 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index bf7996b1cd71c..0a51edad7c45e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Optional; +import java.util.stream.Collectors; /** * A DLM step responsible for force merging the index. @@ -132,6 +133,11 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe ); } + /** + * Helper method to execute the force merge request for the given index. This method forms the request and uses the + * step context to execute it in a deduplicated manner. The actual execution of the force merge request is + * delegated to the {@link #forceMerge} method. + */ void maybeForceMerge(String index, DlmStepContext stepContext) { ForceMergeRequest forceMergeRequest = formForceMergeRequest(index); stepContext.executeDeduplicatedRequest( @@ -142,36 +148,47 @@ void maybeForceMerge(String index, DlmStepContext stepContext) { ); } + /** This method executes the given force merge request. Once the request has completed successfully it updates + * the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} in the cluster state indicating that the force merge has completed. + * The listener is notified after the cluster state update has been made, or when the force merge fails or the + * update to the cluster state fails. + */ protected void forceMerge( ProjectId projectId, ForceMergeRequest forceMergeRequest, ActionListener listener, DlmStepContext stepContext ) { - logger.debug("DLM attempting to force merge index [{}]", stepContext.indexName()); + assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time"; + final String targetIndex = forceMergeRequest.indices()[0]; + logger.info("DLM is issuing a request to force merge index [{}]", targetIndex); stepContext.client() .projectClient(projectId) .admin() .indices() - .forceMerge(forceMergeRequest, listener.delegateFailureAndWrap((l, response) -> { - if (response.getFailedShards() == 0) { - logger.debug("DLM successfully force merged index [{}]", stepContext.indexName()); - l.onResponse(null); - } else { - DefaultShardOperationFailedException[] failures = response.getShardFailures(); - String errorMessage = Strings.format( - "DLM failed while force merging index [%s] with the following failures: [%s]", - stepContext.indexName(), + .forceMerge(forceMergeRequest, listener.delegateFailureAndWrap((l, forceMergeResponse) -> { + if (forceMergeResponse.getFailedShards() > 0) { + DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures(); + String message = Strings.format( + "DLM failed to force merge %d shards for index [%s] due to failures [%s]", + forceMergeResponse.getFailedShards(), + targetIndex, failures == null - ? "n/a" - : org.elasticsearch.common.Strings.collectionToDelimitedString( - Arrays.stream(failures).map(org.elasticsearch.common.Strings::toString).toList(), - "," - ) + ? "unknown" + : Arrays.stream(failures).map(DefaultShardOperationFailedException::toString).collect(Collectors.joining(",")) + ); + l.onFailure(new ElasticsearchException(message)); + } else if (forceMergeResponse.getTotalShards() != forceMergeResponse.getSuccessfulShards()) { + String message = Strings.format( + "DLM failed while force merging index [%s]: only %d out of %d shards succeeded", + targetIndex, + forceMergeResponse.getSuccessfulShards(), + forceMergeResponse.getTotalShards() ); - logger.warn(errorMessage); - ElasticsearchException e = new ElasticsearchException(errorMessage); - l.onFailure(e); + l.onFailure(new ElasticsearchException(message)); + } else { + logger.info("DLM successfully force merged index [{}]", targetIndex); + markDLMForceMergeComplete(stepContext, listener); } })); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index 67dad79385241..ec6d5af83b70a 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -163,8 +163,16 @@ public void testMaybeForceMergeSuccessClearsErrorRecord() { BroadcastResponse response = Mockito.mock(BroadcastResponse.class); Mockito.when(response.getFailedShards()).thenReturn(0); + Mockito.when(response.getTotalShards()).thenReturn(1); + Mockito.when(response.getSuccessfulShards()).thenReturn(1); capturedForceMergeListener.get().onResponse(response); + // After the force merge succeeds, forceMerge() calls markDLMForceMergeComplete() which submits + // an UpdateSettingsRequest. The deduplicator's ErrorRecordingActionListener only fires once + // that nested request completes, so we must complete it here. + assertThat(capturedListener.get(), is(notNullValue())); + capturedListener.get().onResponse(AcknowledgedResponse.TRUE); + // ErrorRecordingActionListener.onResponse clears the error record assertThat(errorStore.getError(projectId, indexName), is(nullValue())); } @@ -204,6 +212,26 @@ public void testForceMergeFailsWhenShardsHaveFailures() { Mockito.when(response.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[] { shardFailure }); capturedForceMergeListener.get().onResponse(response); + assertThat(capturedFailure.get(), is(notNullValue())); + assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); + assertThat(capturedFailure.get().getMessage(), containsString(indexName)); + assertThat(capturedFailure.get().getMessage(), containsString("DLM failed to force merge")); + } + + public void testForceMergeFailsWhenShardsPartiallySuccessful() { + ProjectState projectState = createProjectState(); + DlmStepContext stepContext = createStepContext(projectState); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); + AtomicReference capturedFailure = new AtomicReference<>(); + forceMergeStep.forceMerge(projectId, forceMergeRequest, ActionListener.wrap(v -> { + throw new AssertionError("expected failure but got success"); + }, capturedFailure::set), stepContext); + BroadcastResponse response = Mockito.mock(BroadcastResponse.class); + Mockito.when(response.getTotalShards()).thenReturn(5); + Mockito.when(response.getSuccessfulShards()).thenReturn(3); + Mockito.when(response.getFailedShards()).thenReturn(0); + Mockito.when(response.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[0]); + capturedForceMergeListener.get().onResponse(response); assertThat(capturedFailure.get(), is(notNullValue())); assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); assertThat(capturedFailure.get().getMessage(), containsString(indexName)); From 9d0a1c832ceba963ba244fa15bb2f012a2d25752 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 11:11:51 -0500 Subject: [PATCH 03/16] Update modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java Co-authored-by: Luke Whiting --- .../lifecycle/transitions/steps/ForceMergeStep.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 0a51edad7c45e..95c394067b6c3 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -148,7 +148,8 @@ void maybeForceMerge(String index, DlmStepContext stepContext) { ); } - /** This method executes the given force merge request. Once the request has completed successfully it updates + /** + * This method executes the given force merge request. Once the request has completed successfully it updates * the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} in the cluster state indicating that the force merge has completed. * The listener is notified after the cluster state update has been made, or when the force merge fails or the * update to the cluster state fails. From 69aa5c4377fd580bfb628fb67e357d546bf04e4e Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 11:17:40 -0500 Subject: [PATCH 04/16] PR feedback --- .../datastreams/lifecycle/DataStreamLifecycleService.java | 5 +++-- .../lifecycle/transitions/steps/ForceMergeStepTests.java | 6 ++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 3765246474361..e6bf6278a24d1 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -1814,8 +1814,9 @@ public void markIndexForDlmForceMerge( /** * This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator. * It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't - * care about it for data stream lifecycle deduplication. This class is non-private for the sake of unit testing, but should not be used - * outside of Data Stream Lifecycle Service. + * care about it for data stream lifecycle deduplication. This class is public so that it can be reused by other data stream lifecycle + * management components (for example when forming DLM force-merge requests), but it is not intended as a + * general-purpose Elasticsearch public API. */ public static final class ForceMergeRequestWrapper extends ForceMergeRequest { public ForceMergeRequestWrapper(ForceMergeRequest original) { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index ec6d5af83b70a..e8fa4bff4b534 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -44,6 +44,7 @@ import org.mockito.Mockito; import java.time.Clock; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.datastreams.lifecycle.transitions.steps.ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING; @@ -161,10 +162,7 @@ public void testMaybeForceMergeSuccessClearsErrorRecord() { forceMergeStep.maybeForceMerge(indexName, stepContext); - BroadcastResponse response = Mockito.mock(BroadcastResponse.class); - Mockito.when(response.getFailedShards()).thenReturn(0); - Mockito.when(response.getTotalShards()).thenReturn(1); - Mockito.when(response.getSuccessfulShards()).thenReturn(1); + BroadcastResponse response = new BroadcastResponse(1, 1, 0, List.of()); capturedForceMergeListener.get().onResponse(response); // After the force merge succeeds, forceMerge() calls markDLMForceMergeComplete() which submits From a1c64ef33bf7cde42739e5c36505266354dce4d3 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 27 Feb 2026 16:24:04 +0000 Subject: [PATCH 05/16] [CI] Auto commit changes from spotless --- .../datastreams/lifecycle/transitions/steps/ForceMergeStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 95c394067b6c3..f0595ca853aba 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -148,7 +148,7 @@ void maybeForceMerge(String index, DlmStepContext stepContext) { ); } - /** + /** * This method executes the given force merge request. Once the request has completed successfully it updates * the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} in the cluster state indicating that the force merge has completed. * The listener is notified after the cluster state update has been made, or when the force merge fails or the From 1b4549809d05dadb1e89885c95deaee04dccfb26 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 15:20:17 -0500 Subject: [PATCH 06/16] implement execute(), add completeness check before running maybeForceMerge() --- .../transitions/steps/ForceMergeStep.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index f0595ca853aba..cd2f09f230ccc 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -69,11 +69,11 @@ public boolean stepCompleted(Index index, ProjectState projectState) { * This method determines how to execute the step and performs the necessary operations to update the index * so that {@link #stepCompleted(Index, ProjectState)} will return true after successful execution. * - * @param dlmStepContext The context and resources for executing the step. + * @param stepContext The context and resources for executing the step. */ @Override - public void execute(DlmStepContext dlmStepContext) { - // Todo: Implement the force merge logic here. + public void execute(DlmStepContext stepContext) { + maybeForceMerge(stepContext.indexName(), stepContext); } /** @@ -136,9 +136,15 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe /** * Helper method to execute the force merge request for the given index. This method forms the request and uses the * step context to execute it in a deduplicated manner. The actual execution of the force merge request is - * delegated to the {@link #forceMerge} method. + * delegated to the {@link #forceMerge} method. Checks if the force merge has already been completed for the + * index before executing and skips execution if so. */ void maybeForceMerge(String index, DlmStepContext stepContext) { + if (isDLMForceMergeComplete(stepContext.index(), stepContext.projectState())) { + logger.info("DLM force merge step is already completed for index [{}], skipping execution", stepContext.indexName()); + return; + } + ForceMergeRequest forceMergeRequest = formForceMergeRequest(index); stepContext.executeDeduplicatedRequest( ForceMergeAction.NAME, @@ -161,6 +167,7 @@ protected void forceMerge( DlmStepContext stepContext ) { assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time"; + final String targetIndex = forceMergeRequest.indices()[0]; logger.info("DLM is issuing a request to force merge index [{}]", targetIndex); stepContext.client() From 6b8c255cedc4ad1ec929d964794d2ee8115ba078 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 15:24:00 -0500 Subject: [PATCH 07/16] add unit test for when force merge is already complete --- .../lifecycle/transitions/steps/ForceMergeStepTests.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index e8fa4bff4b534..3565047497e42 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -152,6 +152,15 @@ public void testMaybeForceMergeSubmitsForceMergeRequest() { assertThat(capturedForceMergeRequest.get().maxNumSegments(), is(1)); } + public void testMaybeForceMergeSkipsWhenAlreadyComplete() { + ProjectState projectState = createProjectStateWithSetting(true); + DlmStepContext stepContext = createStepContext(projectState); + + forceMergeStep.maybeForceMerge(indexName, stepContext); + + assertThat(capturedForceMergeRequest.get(), is(nullValue())); + } + public void testMaybeForceMergeSuccessClearsErrorRecord() { ProjectState projectState = createProjectState(); DlmStepContext stepContext = createStepContext(projectState); From 147b93ce9ba6d76d874ff9ac5ce30d1c475fe712 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 15:36:58 -0500 Subject: [PATCH 08/16] add check for nonexistent index --- .../transitions/steps/ForceMergeStep.java | 11 +++++++++ .../steps/ForceMergeStepTests.java | 24 +++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index cd2f09f230ccc..30773051eb972 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -140,6 +141,16 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe * index before executing and skips execution if so. */ void maybeForceMerge(String index, DlmStepContext stepContext) { + IndexMetadata indexMetadata = Optional.ofNullable(stepContext.projectState()) + .map(ProjectState::metadata) + .map(metadata -> metadata.index(index)) + .orElse(null); + + if (indexMetadata == null) { + logger.warn("Index [{}] not found in project metadata, skipping force merge step", index); + return; + } + if (isDLMForceMergeComplete(stepContext.index(), stepContext.projectState())) { logger.info("DLM force merge step is already completed for index [{}], skipping execution", stepContext.indexName()); return; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index 3565047497e42..966e718b86859 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; @@ -152,6 +153,15 @@ public void testMaybeForceMergeSubmitsForceMergeRequest() { assertThat(capturedForceMergeRequest.get().maxNumSegments(), is(1)); } + public void testMaybeForceMergeSkipsWhenIndexNotInMetadata() { + ProjectState projectState = buildProjectState(null); + DlmStepContext stepContext = createStepContext(projectState); + + forceMergeStep.maybeForceMerge(indexName, stepContext); + + assertThat(capturedForceMergeRequest.get(), is(nullValue())); + } + public void testMaybeForceMergeSkipsWhenAlreadyComplete() { ProjectState projectState = createProjectStateWithSetting(true); DlmStepContext stepContext = createStepContext(projectState); @@ -253,11 +263,15 @@ private ProjectState createProjectStateWithSetting(boolean forceMergeComplete) { return buildProjectState(Settings.builder().put(DLM_FORCE_MERGE_COMPLETE_SETTING.getKey(), forceMergeComplete).build()); } - private ProjectState buildProjectState(Settings additionalSettings) { - IndexMetadata indexMetadata = buildIndexMetadata(additionalSettings); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).put(indexMetadata, false)) - .build(); + /** + * Builds a {@link ProjectState} for the test index. Pass {@code null} to omit the index from metadata entirely. + */ + private ProjectState buildProjectState(@Nullable Settings indexSettings) { + ProjectMetadata.Builder projectMetadata = ProjectMetadata.builder(projectId); + if (indexSettings != null) { + projectMetadata.put(buildIndexMetadata(indexSettings), false); + } + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadata.build()).build(); return clusterState.projectState(projectId); } From a4e3443a2b25954448aa19ea4bb30d92ac0606d4 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Fri, 27 Feb 2026 15:39:39 -0500 Subject: [PATCH 09/16] update comment --- .../datastreams/lifecycle/transitions/steps/ForceMergeStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 30773051eb972..5104a240817e0 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -138,7 +138,7 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe * Helper method to execute the force merge request for the given index. This method forms the request and uses the * step context to execute it in a deduplicated manner. The actual execution of the force merge request is * delegated to the {@link #forceMerge} method. Checks if the force merge has already been completed for the - * index before executing and skips execution if so. + * index before executing and skips execution if so. Also skips if the index does not exist in the project metadata. */ void maybeForceMerge(String index, DlmStepContext stepContext) { IndexMetadata indexMetadata = Optional.ofNullable(stepContext.projectState()) From ba18299736d40a0d52250b2f070cd0375cd5e687 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Mon, 2 Mar 2026 11:07:25 -0500 Subject: [PATCH 10/16] Update modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java Co-authored-by: Luke Whiting --- .../lifecycle/transitions/steps/ForceMergeStep.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 5104a240817e0..1ee033d08b217 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -141,12 +141,12 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe * index before executing and skips execution if so. Also skips if the index does not exist in the project metadata. */ void maybeForceMerge(String index, DlmStepContext stepContext) { - IndexMetadata indexMetadata = Optional.ofNullable(stepContext.projectState()) + boolean indexMissing = Optional.ofNullable(stepContext.projectState()) .map(ProjectState::metadata) .map(metadata -> metadata.index(index)) - .orElse(null); + .isEmpty(); - if (indexMetadata == null) { + if (indexMissing) { logger.warn("Index [{}] not found in project metadata, skipping force merge step", index); return; } From a2783d151362eccf60fe720aa978da31fcf12ada Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 2 Mar 2026 16:17:10 +0000 Subject: [PATCH 11/16] [CI] Auto commit changes from spotless --- .../datastreams/lifecycle/transitions/steps/ForceMergeStep.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 1ee033d08b217..0156bdc0d7730 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; From def11f4af4ac5f9a4f9d6e6aa2ee7d206e0eea6a Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Mon, 2 Mar 2026 11:38:44 -0500 Subject: [PATCH 12/16] Update modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java Co-authored-by: Lee Hinman --- .../datastreams/lifecycle/transitions/steps/ForceMergeStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 0156bdc0d7730..05db3bab7513e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -179,7 +179,7 @@ protected void forceMerge( assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time"; final String targetIndex = forceMergeRequest.indices()[0]; - logger.info("DLM is issuing a request to force merge index [{}]", targetIndex); + logger.info("DLM is issuing a request to force merge index [{}] to a single segment", targetIndex); stepContext.client() .projectClient(projectId) .admin() From da5889795fcf7a9599a43947abfa0eee820e85e6 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Mon, 2 Mar 2026 15:38:21 -0500 Subject: [PATCH 13/16] respond to PR feedback --- .../lifecycle/DataStreamLifecycleService.java | 55 -------------- .../lifecycle/ForceMergeRequestWrapper.java | 71 +++++++++++++++++++ .../transitions/steps/ForceMergeStep.java | 11 +-- .../DataStreamLifecycleServiceTests.java | 22 +++--- .../steps/ForceMergeStepTests.java | 21 ++---- 5 files changed, 93 insertions(+), 87 deletions(-) create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ForceMergeRequestWrapper.java diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 764f24d640b83..cce56dd07600b 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -1842,59 +1842,4 @@ public void markIndexForDlmForceMerge( ); } - /** - * This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator. - * It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't - * care about it for data stream lifecycle deduplication. This class is public so that it can be reused by other data stream lifecycle - * management components (for example when forming DLM force-merge requests), but it is not intended as a - * general-purpose Elasticsearch public API. - */ - public static final class ForceMergeRequestWrapper extends ForceMergeRequest { - public ForceMergeRequestWrapper(ForceMergeRequest original) { - super(original.indices()); - this.maxNumSegments(original.maxNumSegments()); - this.onlyExpungeDeletes(original.onlyExpungeDeletes()); - this.flush(original.flush()); - this.indicesOptions(original.indicesOptions()); - this.setShouldStoreResult(original.getShouldStoreResult()); - this.setRequestId(original.getRequestId()); - this.timeout(original.timeout()); - this.setParentTask(original.getParentTask()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ForceMergeRequest that = (ForceMergeRequest) o; - return Arrays.equals(indices, that.indices()) - && maxNumSegments() == that.maxNumSegments() - && onlyExpungeDeletes() == that.onlyExpungeDeletes() - && flush() == that.flush() - && Objects.equals(indicesOptions(), that.indicesOptions()) - && getShouldStoreResult() == that.getShouldStoreResult() - && getRequestId() == that.getRequestId() - && Objects.equals(timeout(), that.timeout()) - && Objects.equals(getParentTask(), that.getParentTask()); - } - - @Override - public int hashCode() { - return Objects.hash( - Arrays.hashCode(indices), - maxNumSegments(), - onlyExpungeDeletes(), - flush(), - indicesOptions(), - getShouldStoreResult(), - getRequestId(), - timeout(), - getParentTask() - ); - } - } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ForceMergeRequestWrapper.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ForceMergeRequestWrapper.java new file mode 100644 index 0000000000000..64ac78cc25842 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ForceMergeRequestWrapper.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; + +import java.util.Arrays; +import java.util.Objects; + +/** + * This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator. + * It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't + * care about it for data stream lifecycle deduplication. This class is public so that it can be reused by other data stream lifecycle + * management components (for example when forming DLM force-merge requests), but it is not intended as a + * general-purpose Elasticsearch public API. + */ +public final class ForceMergeRequestWrapper extends ForceMergeRequest { + public ForceMergeRequestWrapper(ForceMergeRequest original) { + super(original.indices()); + this.maxNumSegments(original.maxNumSegments()); + this.onlyExpungeDeletes(original.onlyExpungeDeletes()); + this.flush(original.flush()); + this.indicesOptions(original.indicesOptions()); + this.setShouldStoreResult(original.getShouldStoreResult()); + this.setRequestId(original.getRequestId()); + this.timeout(original.timeout()); + this.setParentTask(original.getParentTask()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ForceMergeRequest that = (ForceMergeRequest) o; + return Arrays.equals(indices, that.indices()) + && maxNumSegments() == that.maxNumSegments() + && onlyExpungeDeletes() == that.onlyExpungeDeletes() + && flush() == that.flush() + && Objects.equals(indicesOptions(), that.indicesOptions()) + && getShouldStoreResult() == that.getShouldStoreResult() + && getRequestId() == that.getRequestId() + && Objects.equals(timeout(), that.timeout()) + && Objects.equals(getParentTask(), that.getParentTask()); + } + + @Override + public int hashCode() { + return Objects.hash( + Arrays.hashCode(indices), + maxNumSegments(), + onlyExpungeDeletes(), + flush(), + indicesOptions(), + getShouldStoreResult(), + getRequestId(), + timeout(), + getParentTask() + ); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 05db3bab7513e..441daf556ae68 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.datastreams.lifecycle.ForceMergeRequestWrapper; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; import org.elasticsearch.index.Index; @@ -73,7 +73,7 @@ public boolean stepCompleted(Index index, ProjectState projectState) { */ @Override public void execute(DlmStepContext stepContext) { - maybeForceMerge(stepContext.indexName(), stepContext); + maybeForceMerge(stepContext); } /** @@ -139,7 +139,8 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe * delegated to the {@link #forceMerge} method. Checks if the force merge has already been completed for the * index before executing and skips execution if so. Also skips if the index does not exist in the project metadata. */ - void maybeForceMerge(String index, DlmStepContext stepContext) { + void maybeForceMerge(DlmStepContext stepContext) { + Index index = stepContext.index(); boolean indexMissing = Optional.ofNullable(stepContext.projectState()) .map(ProjectState::metadata) .map(metadata -> metadata.index(index)) @@ -155,7 +156,7 @@ void maybeForceMerge(String index, DlmStepContext stepContext) { return; } - ForceMergeRequest forceMergeRequest = formForceMergeRequest(index); + ForceMergeRequest forceMergeRequest = formForceMergeRequest(index.getName()); stepContext.executeDeduplicatedRequest( ForceMergeAction.NAME, forceMergeRequest, @@ -214,7 +215,7 @@ protected void forceMerge( private ForceMergeRequest formForceMergeRequest(String index) { ForceMergeRequest req = new ForceMergeRequest(index); req.maxNumSegments(SINGLE_SEGMENT); - return new DataStreamLifecycleService.ForceMergeRequestWrapper(req); + return new ForceMergeRequestWrapper(req); } /** diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 97bc96d2950ce..5153657a1fa2d 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -1099,7 +1099,7 @@ public void testForceMergeRequestWrapperEqualsHashCode() { originalRequest.onlyExpungeDeletes(randomBoolean()); originalRequest.flush(randomBoolean()); EqualsHashCodeTestUtils.checkEqualsAndHashCode( - new DataStreamLifecycleService.ForceMergeRequestWrapper(originalRequest), + new ForceMergeRequestWrapper(originalRequest), DataStreamLifecycleServiceTests::copyForceMergeRequestWrapperRequest, DataStreamLifecycleServiceTests::mutateForceMergeRequestWrapper ); @@ -1718,18 +1718,14 @@ private ClusterState createClusterState(ProjectId projectId, String indexName, M return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build(); } - private static DataStreamLifecycleService.ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest( - DataStreamLifecycleService.ForceMergeRequestWrapper original - ) { - return new DataStreamLifecycleService.ForceMergeRequestWrapper(original); + private static ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest(ForceMergeRequestWrapper original) { + return new ForceMergeRequestWrapper(original); } - private static DataStreamLifecycleService.ForceMergeRequestWrapper mutateForceMergeRequestWrapper( - DataStreamLifecycleService.ForceMergeRequestWrapper original - ) { + private static ForceMergeRequestWrapper mutateForceMergeRequestWrapper(ForceMergeRequestWrapper original) { switch (randomIntBetween(0, 4)) { case 0 -> { - DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); + ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); String[] originalIndices = original.indices(); int changedIndexIndex; if (originalIndices.length > 0) { @@ -1745,22 +1741,22 @@ private static DataStreamLifecycleService.ForceMergeRequestWrapper mutateForceMe return copy; } case 1 -> { - DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); + ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); copy.onlyExpungeDeletes(original.onlyExpungeDeletes() == false); return copy; } case 2 -> { - DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); + ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); copy.flush(original.flush() == false); return copy; } case 3 -> { - DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); + ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); copy.maxNumSegments(original.maxNumSegments() + 1); return copy; } case 4 -> { - DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); + ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original); copy.setRequestId(original.getRequestId() + 1); return copy; } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index 966e718b86859..9b8f0f75c7559 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.transport.TransportRequest; import org.junit.After; import org.junit.Before; -import org.mockito.Mockito; import java.time.Clock; import java.util.List; @@ -145,7 +144,7 @@ public void testMaybeForceMergeSubmitsForceMergeRequest() { ProjectState projectState = createProjectState(); DlmStepContext stepContext = createStepContext(projectState); - forceMergeStep.maybeForceMerge(indexName, stepContext); + forceMergeStep.maybeForceMerge(stepContext); assertThat(capturedForceMergeRequest.get(), is(notNullValue())); assertThat(capturedForceMergeRequest.get().indices().length, is(1)); @@ -157,7 +156,7 @@ public void testMaybeForceMergeSkipsWhenIndexNotInMetadata() { ProjectState projectState = buildProjectState(null); DlmStepContext stepContext = createStepContext(projectState); - forceMergeStep.maybeForceMerge(indexName, stepContext); + forceMergeStep.maybeForceMerge(stepContext); assertThat(capturedForceMergeRequest.get(), is(nullValue())); } @@ -166,7 +165,7 @@ public void testMaybeForceMergeSkipsWhenAlreadyComplete() { ProjectState projectState = createProjectStateWithSetting(true); DlmStepContext stepContext = createStepContext(projectState); - forceMergeStep.maybeForceMerge(indexName, stepContext); + forceMergeStep.maybeForceMerge(stepContext); assertThat(capturedForceMergeRequest.get(), is(nullValue())); } @@ -179,7 +178,7 @@ public void testMaybeForceMergeSuccessClearsErrorRecord() { errorStore.recordError(projectId, indexName, new RuntimeException("previous error")); assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); - forceMergeStep.maybeForceMerge(indexName, stepContext); + forceMergeStep.maybeForceMerge(stepContext); BroadcastResponse response = new BroadcastResponse(1, 1, 0, List.of()); capturedForceMergeListener.get().onResponse(response); @@ -198,7 +197,7 @@ public void testMaybeForceMergeRecordsErrorOnListenerFailure() { ProjectState projectState = createProjectState(); DlmStepContext stepContext = createStepContext(projectState); - forceMergeStep.maybeForceMerge(indexName, stepContext); + forceMergeStep.maybeForceMerge(stepContext); RuntimeException failure = new RuntimeException("force merge transport failure"); capturedForceMergeListener.get().onFailure(failure); @@ -224,9 +223,7 @@ public void testForceMergeFailsWhenShardsHaveFailures() { 0, new IllegalStateException("shard merge failed") ); - BroadcastResponse response = Mockito.mock(BroadcastResponse.class); - Mockito.when(response.getFailedShards()).thenReturn(1); - Mockito.when(response.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[] { shardFailure }); + BroadcastResponse response = new BroadcastResponse(1, 0, 1, List.of(shardFailure)); capturedForceMergeListener.get().onResponse(response); assertThat(capturedFailure.get(), is(notNullValue())); @@ -243,11 +240,7 @@ public void testForceMergeFailsWhenShardsPartiallySuccessful() { forceMergeStep.forceMerge(projectId, forceMergeRequest, ActionListener.wrap(v -> { throw new AssertionError("expected failure but got success"); }, capturedFailure::set), stepContext); - BroadcastResponse response = Mockito.mock(BroadcastResponse.class); - Mockito.when(response.getTotalShards()).thenReturn(5); - Mockito.when(response.getSuccessfulShards()).thenReturn(3); - Mockito.when(response.getFailedShards()).thenReturn(0); - Mockito.when(response.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[0]); + BroadcastResponse response = new BroadcastResponse(5, 3, 0, List.of()); capturedForceMergeListener.get().onResponse(response); assertThat(capturedFailure.get(), is(notNullValue())); assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); From a155873d49410930e42dc8c00fa8409d839b56c4 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Tue, 3 Mar 2026 15:21:33 -0500 Subject: [PATCH 14/16] use unavailable shards method --- .../lifecycle/transitions/steps/ForceMergeStep.java | 8 ++++---- .../lifecycle/transitions/steps/ForceMergeStepTests.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 441daf556ae68..b27d446eb36be 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -197,12 +197,12 @@ protected void forceMerge( : Arrays.stream(failures).map(DefaultShardOperationFailedException::toString).collect(Collectors.joining(",")) ); l.onFailure(new ElasticsearchException(message)); - } else if (forceMergeResponse.getTotalShards() != forceMergeResponse.getSuccessfulShards()) { + } else if (forceMergeResponse.getUnavailableShards() > 0) { String message = Strings.format( - "DLM failed while force merging index [%s]: only %d out of %d shards succeeded", + "DLM could not complete force merge for index [%s] because [%d] shards were unavailable." + + "This will be retried in the next cycle.", targetIndex, - forceMergeResponse.getSuccessfulShards(), - forceMergeResponse.getTotalShards() + forceMergeResponse.getUnavailableShards() ); l.onFailure(new ElasticsearchException(message)); } else { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java index 9b8f0f75c7559..eedca67ad96cb 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStepTests.java @@ -245,7 +245,7 @@ public void testForceMergeFailsWhenShardsPartiallySuccessful() { assertThat(capturedFailure.get(), is(notNullValue())); assertThat(capturedFailure.get(), instanceOf(ElasticsearchException.class)); assertThat(capturedFailure.get().getMessage(), containsString(indexName)); - assertThat(capturedFailure.get().getMessage(), containsString("DLM failed while force merging")); + assertThat(capturedFailure.get().getMessage(), containsString("shards were unavailable")); } private ProjectState createProjectState() { From 9e83a2ce58e082c38e79df3d7bec3cfd1a4c7522 Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Tue, 3 Mar 2026 21:49:49 -0500 Subject: [PATCH 15/16] address PR feedback --- .../org/elasticsearch/datastreams/DataStreamsPlugin.java | 9 +++++++-- .../lifecycle/transitions/steps/ForceMergeStep.java | 6 ++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index e5014b81d59e0..ac2dee8f162e8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -191,7 +192,9 @@ public List> getSettings() { pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING); - pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING); + if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { + pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING); + } return pluginSettings; } @@ -287,7 +290,9 @@ public List getActions() { actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class)); actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class)); actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class)); - actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class)); + if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { + actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class)); + } return actions; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index b27d446eb36be..4d1fd33e4747a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.lifecycle.ForceMergeRequestWrapper; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; @@ -159,7 +160,7 @@ void maybeForceMerge(DlmStepContext stepContext) { ForceMergeRequest forceMergeRequest = formForceMergeRequest(index.getName()); stepContext.executeDeduplicatedRequest( ForceMergeAction.NAME, - forceMergeRequest, + new ForceMergeRequestWrapper(forceMergeRequest), Strings.format("DLM service encountered an error trying to force merge index [%s]", index), (req, l) -> forceMerge(stepContext.projectId(), forceMergeRequest, l, stepContext) ); @@ -215,7 +216,8 @@ protected void forceMerge( private ForceMergeRequest formForceMergeRequest(String index) { ForceMergeRequest req = new ForceMergeRequest(index); req.maxNumSegments(SINGLE_SEGMENT); - return new ForceMergeRequestWrapper(req); + req.timeout(TimeValue.timeValueHours(6)); + return req; } /** From 46f1778f140b4216b004a83058b8b92aab6cbc0f Mon Sep 17 00:00:00 2001 From: Sean Zatz Date: Thu, 5 Mar 2026 11:29:16 -0500 Subject: [PATCH 16/16] adjust to max timeout for force merge --- .../datastreams/lifecycle/transitions/steps/ForceMergeStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java index 4d1fd33e4747a..a50b636ec6228 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/ForceMergeStep.java @@ -216,7 +216,7 @@ protected void forceMerge( private ForceMergeRequest formForceMergeRequest(String index) { ForceMergeRequest req = new ForceMergeRequest(index); req.maxNumSegments(SINGLE_SEGMENT); - req.timeout(TimeValue.timeValueHours(6)); + req.timeout(TimeValue.MAX_VALUE); return req; }