Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
31d6c0d
Add DLM force merge operation helpers
seanzatzdev Feb 27, 2026
2731ae7
adapt data lifecycleservice force merge code
seanzatzdev Feb 27, 2026
9d0a1c8
Update modules/data-streams/src/main/java/org/elasticsearch/datastrea…
seanzatzdev Feb 27, 2026
69aa5c4
PR feedback
seanzatzdev Feb 27, 2026
a1c64ef
[CI] Auto commit changes from spotless
Feb 27, 2026
ec18cd9
Merge branch 'main' into dlm-force-merge-operation
seanzatzdev Feb 27, 2026
1b45498
implement execute(), add completeness check before running maybeForce…
seanzatzdev Feb 27, 2026
6b8c255
add unit test for when force merge is already complete
seanzatzdev Feb 27, 2026
147b93c
add check for nonexistent index
seanzatzdev Feb 27, 2026
a4e3443
update comment
seanzatzdev Feb 27, 2026
ba18299
Update modules/data-streams/src/main/java/org/elasticsearch/datastrea…
seanzatzdev Mar 2, 2026
a2783d1
[CI] Auto commit changes from spotless
Mar 2, 2026
def11f4
Update modules/data-streams/src/main/java/org/elasticsearch/datastrea…
seanzatzdev Mar 2, 2026
aba3ac9
Merge branch 'elastic:main' into dlm-force-merge-operation
seanzatzdev Mar 2, 2026
da58897
respond to PR feedback
seanzatzdev Mar 2, 2026
8024964
Merge branch 'elastic:main' into dlm-force-merge-operation
seanzatzdev Mar 3, 2026
a155873
use unavailable shards method
seanzatzdev Mar 3, 2026
9e83a2c
address PR feedback
seanzatzdev Mar 4, 2026
7c46f92
Merge branch 'main' into dlm-force-merge-operation
seanzatzdev Mar 4, 2026
46f1778
adjust to max timeout for force merge
seanzatzdev Mar 5, 2026
64cb76b
Merge branch 'main' into dlm-force-merge-operation
seanzatzdev Mar 5, 2026
2328dd6
Merge branch 'main' into dlm-force-merge-operation
seanzatzdev Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -191,7 +192,9 @@ public List<Setting<?>> getSettings() {
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING);
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING);
}
return pluginSettings;
}

Expand Down Expand Up @@ -287,7 +290,9 @@ public List<ActionHandler> getActions() {
actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class));
actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class));
actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class));
actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class));
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class));
}
return actions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,58 +1842,4 @@ public void markIndexForDlmForceMerge(
);
}

/**
* This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator.
* It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't
* care about it for data stream lifecycle deduplication. This class is non-private for the sake of unit testing, but should not be used
* outside of Data Stream Lifecycle Service.
*/
static final class ForceMergeRequestWrapper extends ForceMergeRequest {
ForceMergeRequestWrapper(ForceMergeRequest original) {
super(original.indices());
this.maxNumSegments(original.maxNumSegments());
this.onlyExpungeDeletes(original.onlyExpungeDeletes());
this.flush(original.flush());
this.indicesOptions(original.indicesOptions());
this.setShouldStoreResult(original.getShouldStoreResult());
this.setRequestId(original.getRequestId());
this.timeout(original.timeout());
this.setParentTask(original.getParentTask());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ForceMergeRequest that = (ForceMergeRequest) o;
return Arrays.equals(indices, that.indices())
&& maxNumSegments() == that.maxNumSegments()
&& onlyExpungeDeletes() == that.onlyExpungeDeletes()
&& flush() == that.flush()
&& Objects.equals(indicesOptions(), that.indicesOptions())
&& getShouldStoreResult() == that.getShouldStoreResult()
&& getRequestId() == that.getRequestId()
&& Objects.equals(timeout(), that.timeout())
&& Objects.equals(getParentTask(), that.getParentTask());
}

@Override
public int hashCode() {
return Objects.hash(
Arrays.hashCode(indices),
maxNumSegments(),
onlyExpungeDeletes(),
flush(),
indicesOptions(),
getShouldStoreResult(),
getRequestId(),
timeout(),
getParentTask()
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.lifecycle;

import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;

import java.util.Arrays;
import java.util.Objects;

/**
* This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator.
* It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't
* care about it for data stream lifecycle deduplication. This class is public so that it can be reused by other data stream lifecycle
* management components (for example when forming DLM force-merge requests), but it is not intended as a
* general-purpose Elasticsearch public API.
*/
public final class ForceMergeRequestWrapper extends ForceMergeRequest {
public ForceMergeRequestWrapper(ForceMergeRequest original) {
super(original.indices());
this.maxNumSegments(original.maxNumSegments());
this.onlyExpungeDeletes(original.onlyExpungeDeletes());
this.flush(original.flush());
this.indicesOptions(original.indicesOptions());
this.setShouldStoreResult(original.getShouldStoreResult());
this.setRequestId(original.getRequestId());
this.timeout(original.timeout());
this.setParentTask(original.getParentTask());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ForceMergeRequest that = (ForceMergeRequest) o;
return Arrays.equals(indices, that.indices())
&& maxNumSegments() == that.maxNumSegments()
&& onlyExpungeDeletes() == that.onlyExpungeDeletes()
&& flush() == that.flush()
&& Objects.equals(indicesOptions(), that.indicesOptions())
&& getShouldStoreResult() == that.getShouldStoreResult()
&& getRequestId() == that.getRequestId()
&& Objects.equals(timeout(), that.timeout())
&& Objects.equals(getParentTask(), that.getParentTask());
}

@Override
public int hashCode() {
return Objects.hash(
Arrays.hashCode(indices),
maxNumSegments(),
onlyExpungeDeletes(),
flush(),
indicesOptions(),
getShouldStoreResult(),
getRequestId(),
timeout(),
getParentTask()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,29 @@

package org.elasticsearch.datastreams.lifecycle.transitions.steps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.ForceMergeRequestWrapper;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext;
import org.elasticsearch.index.Index;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* A DLM step responsible for force merging the index.
Expand All @@ -41,6 +51,8 @@ public class ForceMergeStep implements DlmStep {
private static final Settings FORCE_MERGE_COMPLETE_SETTINGS = Settings.builder()
.put(DLM_FORCE_MERGE_COMPLETE_SETTING.getKey(), true)
.build();
private static final int SINGLE_SEGMENT = 1;
private static final Logger logger = LogManager.getLogger(ForceMergeStep.class);

/**
* Determines if the step has been completed for the given index and project state.
Expand All @@ -58,11 +70,11 @@ public boolean stepCompleted(Index index, ProjectState projectState) {
* This method determines how to execute the step and performs the necessary operations to update the index
* so that {@link #stepCompleted(Index, ProjectState)} will return true after successful execution.
*
* @param dlmStepContext The context and resources for executing the step.
* @param stepContext The context and resources for executing the step.
*/
@Override
public void execute(DlmStepContext dlmStepContext) {
// Todo: Implement the force merge logic here.
public void execute(DlmStepContext stepContext) {
maybeForceMerge(stepContext);
}

/**
Expand Down Expand Up @@ -122,6 +134,92 @@ protected void markDLMForceMergeComplete(DlmStepContext stepContext, ActionListe
);
}

/**
* Helper method to execute the force merge request for the given index. This method forms the request and uses the
* step context to execute it in a deduplicated manner. The actual execution of the force merge request is
* delegated to the {@link #forceMerge} method. Checks if the force merge has already been completed for the
* index before executing and skips execution if so. Also skips if the index does not exist in the project metadata.
*/
void maybeForceMerge(DlmStepContext stepContext) {
Index index = stepContext.index();
boolean indexMissing = Optional.ofNullable(stepContext.projectState())
.map(ProjectState::metadata)
.map(metadata -> metadata.index(index))
.isEmpty();

if (indexMissing) {
logger.warn("Index [{}] not found in project metadata, skipping force merge step", index);
return;
}

if (isDLMForceMergeComplete(stepContext.index(), stepContext.projectState())) {
logger.info("DLM force merge step is already completed for index [{}], skipping execution", stepContext.indexName());
return;
}

ForceMergeRequest forceMergeRequest = formForceMergeRequest(index.getName());
stepContext.executeDeduplicatedRequest(
ForceMergeAction.NAME,
new ForceMergeRequestWrapper(forceMergeRequest),
Strings.format("DLM service encountered an error trying to force merge index [%s]", index),
(req, l) -> forceMerge(stepContext.projectId(), forceMergeRequest, l, stepContext)
);
}

/**
* This method executes the given force merge request. Once the request has completed successfully it updates
* the {@link #DLM_FORCE_MERGE_COMPLETE_SETTING} in the cluster state indicating that the force merge has completed.
* The listener is notified after the cluster state update has been made, or when the force merge fails or the
* update to the cluster state fails.
*/
protected void forceMerge(
ProjectId projectId,
ForceMergeRequest forceMergeRequest,
ActionListener<Void> listener,
DlmStepContext stepContext
) {
assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time";

final String targetIndex = forceMergeRequest.indices()[0];
logger.info("DLM is issuing a request to force merge index [{}] to a single segment", targetIndex);
stepContext.client()
.projectClient(projectId)
.admin()
.indices()
.forceMerge(forceMergeRequest, listener.delegateFailureAndWrap((l, forceMergeResponse) -> {
if (forceMergeResponse.getFailedShards() > 0) {
DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures();
String message = Strings.format(
"DLM failed to force merge %d shards for index [%s] due to failures [%s]",
forceMergeResponse.getFailedShards(),
targetIndex,
failures == null
? "unknown"
: Arrays.stream(failures).map(DefaultShardOperationFailedException::toString).collect(Collectors.joining(","))
);
l.onFailure(new ElasticsearchException(message));
} else if (forceMergeResponse.getUnavailableShards() > 0) {
String message = Strings.format(
"DLM could not complete force merge for index [%s] because [%d] shards were unavailable."
+ "This will be retried in the next cycle.",
targetIndex,
forceMergeResponse.getUnavailableShards()
);
l.onFailure(new ElasticsearchException(message));
} else {
logger.info("DLM successfully force merged index [{}]", targetIndex);
markDLMForceMergeComplete(stepContext, listener);
}
}));
}

private ForceMergeRequest formForceMergeRequest(String index) {
ForceMergeRequest req = new ForceMergeRequest(index);
req.maxNumSegments(SINGLE_SEGMENT);
req.timeout(TimeValue.MAX_VALUE);
return req;
}

/**
* A human-readable name for the step.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ public void testForceMergeRequestWrapperEqualsHashCode() {
originalRequest.onlyExpungeDeletes(randomBoolean());
originalRequest.flush(randomBoolean());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
new DataStreamLifecycleService.ForceMergeRequestWrapper(originalRequest),
new ForceMergeRequestWrapper(originalRequest),
DataStreamLifecycleServiceTests::copyForceMergeRequestWrapperRequest,
DataStreamLifecycleServiceTests::mutateForceMergeRequestWrapper
);
Expand Down Expand Up @@ -1718,18 +1718,14 @@ private ClusterState createClusterState(ProjectId projectId, String indexName, M
return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
}

private static DataStreamLifecycleService.ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest(
DataStreamLifecycleService.ForceMergeRequestWrapper original
) {
return new DataStreamLifecycleService.ForceMergeRequestWrapper(original);
private static ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest(ForceMergeRequestWrapper original) {
return new ForceMergeRequestWrapper(original);
}

private static DataStreamLifecycleService.ForceMergeRequestWrapper mutateForceMergeRequestWrapper(
DataStreamLifecycleService.ForceMergeRequestWrapper original
) {
private static ForceMergeRequestWrapper mutateForceMergeRequestWrapper(ForceMergeRequestWrapper original) {
switch (randomIntBetween(0, 4)) {
case 0 -> {
DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
String[] originalIndices = original.indices();
int changedIndexIndex;
if (originalIndices.length > 0) {
Expand All @@ -1745,22 +1741,22 @@ private static DataStreamLifecycleService.ForceMergeRequestWrapper mutateForceMe
return copy;
}
case 1 -> {
DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
copy.onlyExpungeDeletes(original.onlyExpungeDeletes() == false);
return copy;
}
case 2 -> {
DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
copy.flush(original.flush() == false);
return copy;
}
case 3 -> {
DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
copy.maxNumSegments(original.maxNumSegments() + 1);
return copy;
}
case 4 -> {
DataStreamLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
copy.setRequestId(original.getRequestId() + 1);
return copy;
}
Expand Down
Loading