Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@
import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.ForceMergeStep;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.TransportMarkIndexForDLMForceMergeAction;
import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
Expand Down Expand Up @@ -186,9 +183,6 @@ public List<Setting<?>> getSettings() {
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
pluginSettings.add(ForceMergeStep.DLM_FORCE_MERGE_COMPLETE_SETTING);
}
return pluginSettings;
}

Expand Down Expand Up @@ -217,11 +211,6 @@ public Collection<?> createComponents(PluginServices services) {
)
);

// Register DLM actions here. Order matters - they will be executed in the order they are listed for a given index.
List<DlmAction> dlmActions = List.of();

verifyActions(dlmActions);

dataLifecycleInitialisationService.set(
new DataStreamLifecycleService(
settings,
Expand All @@ -233,8 +222,7 @@ public Collection<?> createComponents(PluginServices services) {
errorStoreInitialisationService.get(),
services.allocationService(),
dataStreamLifecycleErrorsPublisher.get(),
services.dataStreamGlobalRetentionSettings(),
dlmActions
services.dataStreamGlobalRetentionSettings()
)
);
dataLifecycleInitialisationService.get().init();
Expand All @@ -246,26 +234,6 @@ public Collection<?> createComponents(PluginServices services) {
return components;
}

// visible for testing
static void verifyActions(List<DlmAction> dlmActions) {
for (DlmAction action : dlmActions) {
if (action.steps().isEmpty()) {
throw new IllegalStateException("DLM action [" + action.name() + "] must have at least one step");
}
for (DlmStep step : action.steps()) {
if (step.possibleOutputIndexNamePatterns("dummy-index").isEmpty()) {
throw new IllegalStateException(
"DLM step ["
+ step.stepName()
+ "] in action ["
+ action.name()
+ "] must have at least one possible output index name pattern"
);
}
}
}
}

@Override
public List<ActionHandler> getActions() {
List<ActionHandler> actions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@
import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleIndexExecutor;
import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleToDS;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -105,7 +101,6 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.ALL;
import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.BACKING_INDICES;
import static org.elasticsearch.cluster.metadata.DataStream.DatastreamIndexTypes.FAILURE_INDICES;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
Expand Down Expand Up @@ -179,7 +174,6 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
final ResultDeduplicator<Tuple<ProjectId, String>, Void> clusterStateChangesDeduplicator;
private final DataStreamLifecycleHealthInfoPublisher dslHealthInfoPublisher;
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
private final List<DlmAction> actions;
private LongSupplier nowSupplier;
private final Clock clock;
private final DataStreamLifecycleErrorStore errorStore;
Expand Down Expand Up @@ -232,8 +226,7 @@ public DataStreamLifecycleService(
DataStreamLifecycleErrorStore errorStore,
AllocationService allocationService,
DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher,
DataStreamGlobalRetentionSettings globalRetentionSettings,
List<DlmAction> actions
DataStreamGlobalRetentionSettings globalRetentionSettings
) {
this.settings = settings;
this.client = client;
Expand Down Expand Up @@ -274,7 +267,6 @@ public DataStreamLifecycleService(
new MarkIndicesForFrozenExecutor()
);
this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher;
this.actions = actions;
}

/**
Expand Down Expand Up @@ -512,19 +504,6 @@ private void run(ProjectState projectState) {
);
}

try {
indicesToExcludeForRemainingRun.addAll(maybeProcessDlmActions(projectState, dataStream, indicesToExcludeForRemainingRun));
} catch (Exception e) {
logger.warn(
() -> String.format(
Locale.ROOT,
"Data stream lifecycle failed to execute actions for data stream [%s]",
dataStream.getName()
),
e
);
}

affectedIndices += indicesToExcludeForRemainingRun.size();
affectedDataStreams++;
}
Expand Down Expand Up @@ -640,232 +619,6 @@ public void maybeMarkIndicesForFrozen(ProjectState projectState, Set<Index> indi
);
}

/**
* Processes Data Lifecycle Management (DLM) actions for the given data stream.
* <p>
* For each configured {@link DlmAction}, this method:
* * Determines if the action is scheduled for the data stream.
* * Finds indices eligible for the action, excluding those in {@code indicesToExclude}.
* * For each eligible index, iterates through the action's steps in reverse order until finding a step that is complete or
* reaching the start of the list
* * Iterate one step forward through the list to find the first incomplete step.
* * Execute the step handling and logging any exceptions.
* * Adds the index to {@code indicesToExclude} after a step is executed to avoid reprocessing in this run.
* <p>
* Any errors encountered during step completion checks or execution are logged, but do not prevent processing of
* other actions or indices.
*
* @param projectState the current project state
* @param dataStream the data stream to process
* @return The set of indices processed that should be ignored by later actions / included in stats
*/
// Visible for testing
Set<Index> maybeProcessDlmActions(ProjectState projectState, DataStream dataStream, Set<Index> indicesToExclude) {
HashSet<Index> indicesProcessed = new HashSet<>();
DlmActionContext actionContext = new DlmActionContext(
projectState,
transportActionsDeduplicator,
errorStore,
signallingErrorRetryInterval,
client,
Clock.systemUTC()
);
for (DlmAction action : actions) {

if (action.canRunOnProject(actionContext) == false) {
logger.trace(
"Skipping action [{}] for project [{}] as prerequisites are not met",
action.name(),
projectState != null ? projectState.projectId() : "unknown"
);
continue;
}

TimeValue actionSchedule = action.applyAfterTime().apply(dataStream.getDataLifecycle());

if (actionSchedule == null) {
logger.trace(
"Data stream lifecycle action [{}] is not scheduled for data stream [{}]",
action.name(),
dataStream.getName()
);
continue;
}

long actionStartTime = nowSupplier.getAsLong();

List<Index> indicesEligibleForAction;
if (action.appliesToFailureStore()) {
indicesEligibleForAction = dataStream.getIndicesOlderThan(
indexName -> projectState.metadata().index(indexName),
nowSupplier,
actionSchedule,
ALL
);
} else {
indicesEligibleForAction = dataStream.getIndicesOlderThan(
indexName -> projectState.metadata().index(indexName),
nowSupplier,
actionSchedule,
BACKING_INDICES
);
}

indicesEligibleForAction.removeAll(indicesToExclude);
indicesEligibleForAction.removeAll(indicesProcessed);

logger.trace(
"Data stream lifecycle action [{}] found [{}] eligible indices for data stream [{}]",
action.name(),
indicesEligibleForAction.size(),
dataStream.getName()
);

for (Index index : indicesEligibleForAction) {
long findStepStartTime = nowSupplier.getAsLong();
int stepToExecuteIndex = findFirstIncompleteStepIndex(projectState, dataStream, action, index);
if (logger.isTraceEnabled()) {
long findStepDuration = nowSupplier.getAsLong() - findStepStartTime;
logger.trace(
"Finding first incomplete step for action [{}] on datastream [{}] index [{}] took [{}]",
action.name(),
dataStream.getName(),
index.getName(),
formatExecutionTime(findStepDuration)
);
}

if (stepToExecuteIndex >= 0) {
DlmStep stepToExecute = action.steps().get(stepToExecuteIndex);
try {
logger.trace(
"Executing step [{}] for action [{}] on datastream [{}] index [{}]",
stepToExecute.stepName(),
action.name(),
dataStream.getName(),
index.getName()
);
long stepStartTime = nowSupplier.getAsLong();
Index indexForExecution = resolveIndexOutputFromPreviousStep(stepToExecuteIndex, index, action, projectState);
DlmStepContext dlmStepContext = actionContext.stepContextFor(indexForExecution);
stepToExecute.execute(dlmStepContext);
if (logger.isTraceEnabled()) {
long stepDuration = nowSupplier.getAsLong() - stepStartTime;
logger.trace(
"Executed step [{}] for action [{}] on datastream [{}] index [{}] in [{}]",
stepToExecute.stepName(),
action.name(),
dataStream.getName(),
index.getName(),
formatExecutionTime(stepDuration)
);
}
} catch (Exception ex) {
logger.warn(
logger.getMessageFactory()
.newMessage(
"Unable to execute step [{}] for action [{}] on datastream [{}] index [{}]",
stepToExecute.stepName(),
action.name(),
dataStream.getName(),
index.getName()
),
ex
);
continue;
}
indicesProcessed.add(index);
}
}
if (logger.isTraceEnabled()) {
long actionDuration = nowSupplier.getAsLong() - actionStartTime;
logger.trace(
"Data stream lifecycle action [{}] for data stream [{}] completed in [{}]",
action.name(),
dataStream.getName(),
formatExecutionTime(actionDuration)
);
}
}
return indicesProcessed;
}

private int findFirstIncompleteStepIndex(ProjectState projectState, DataStream dataStream, DlmAction action, Index index) {
assert action.steps().size() >= 1 : "an action must have at least one step";
int stepToExecute = -1;
for (int i = action.steps().size() - 1; i >= 0; i--) {
DlmStep step = action.steps().get(i);
try {
long checkStartTime = nowSupplier.getAsLong();
Index indexInUse = resolveIndexOutputFromPreviousStep(i, index, action, projectState);
if (step.stepCompleted(indexInUse, projectState) == false) {
stepToExecute = i;
if (logger.isTraceEnabled()) {
logger.trace(
"Step [{}] for action [{}] on datastream [{}] index [{}] is not complete, checked in [{}]",
step.stepName(),
action.name(),
dataStream.getName(),
indexInUse.getName(),
formatExecutionTime(nowSupplier.getAsLong() - checkStartTime)
);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(
"Step [{}] for action [{}] on datastream [{}] index [{}] is already complete, checked in [{}]",
step.stepName(),
action.name(),
dataStream.getName(),
indexInUse.getName(),
formatExecutionTime(nowSupplier.getAsLong() - checkStartTime)
);
}
break;
}
} catch (Exception ex) {
logger.warn(
logger.getMessageFactory()
.newMessage(
"Unable to execute check for step complete [{}] for action [{}] on datastream [{}] index [{}]",
step.stepName(),
action.name(),
dataStream.getName(),
index.getName()
),
ex
);
}
}
return stepToExecute;
}

// visible for testing
Index resolveIndexOutputFromPreviousStep(int stepToExecuteIndex, Index index, DlmAction action, ProjectState projectState) {
if (stepToExecuteIndex < 1) {
return index;
}

DlmStep previousStep = action.steps().get(stepToExecuteIndex - 1);
List<String> possibleIndexNames = previousStep.possibleOutputIndexNamePatterns(index.getName());

return possibleIndexNames.stream()
.filter(projectState.metadata()::hasIndex)
.findFirst()
.map(possibleName -> projectState.metadata().index(possibleName).getIndex())
.orElseGet(() -> {
assert false
: "Unable to resolve index name for executing step ["
+ action.steps().get(stepToExecuteIndex).stepName()
+ "] for action ["
+ action.name()
+ "] with index ["
+ index.getName()
+ "]";
return index;
});
}

// visible for testing
static Set<Index> timeSeriesIndicesStillWithinTimeBounds(ProjectMetadata project, List<Index> targetIndices, LongSupplier nowSupplier) {
Set<Index> tsIndicesWithinBounds = new HashSet<>();
Expand Down
Loading
Loading