Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
Expand All @@ -35,6 +40,7 @@
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -64,13 +70,15 @@
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -86,9 +94,12 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
Expand Down Expand Up @@ -117,6 +128,7 @@

public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleServiceIT.class);
private static final String DEFAULT_REPO = "my-repo";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -895,7 +907,7 @@ public void testReenableDataStreamLifecycle() throws Exception {

public void testLifecycleAppliedToFailureStore() throws Exception {
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.failuresLifecycleBuilder()
.dataRetention(TimeValue.timeValueSeconds(20))
.dataRetention(TimeValue.timeValueMinutes(20))
Copy link
Copy Markdown
Member Author

@dakrone dakrone Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you are wondering why this seemingly unrelated change was made: I ran these tests many many times while I was developing the test for this PR. This one in particular was flaky, because on a slower machine the index ended up deleted before we could do the check. This change makes the test no longer flaky on my machine.

It does not actually change the test behavior, or what we're testing for this particular test.

.buildTemplate();

putComposableIndexTemplate("id1", """
Expand Down Expand Up @@ -937,17 +949,27 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
ByteSizeValue targetFloor = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(clusterSettings);

assertBusy(() -> {
GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(firstGenerationIndex)
.includeDefaults(true);
GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
assertThat(
getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()),
is(targetFactor.toString())
);
assertThat(
getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()),
is(targetFloor.getStringRep())
);
try {
GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(firstGenerationIndex)
.includeDefaults(true);
GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
assertThat(
getSettingsResponse.getSetting(
firstGenerationIndex,
MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()
),
is(targetFactor.toString())
);
assertThat(
getSettingsResponse.getSetting(
firstGenerationIndex,
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()
),
is(targetFloor.getStringRep())
);
} catch (IndexNotFoundException e) {
fail("expected index " + firstGenerationIndex + " to exist but it did not.");
}
});

updateFailureStoreConfiguration(dataStreamName, true, TimeValue.timeValueSeconds(1));
Expand All @@ -967,7 +989,84 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
List<Index> retrievedFailureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(retrievedFailureIndices.size(), equalTo(1));
assertThat(retrievedFailureIndices.get(0).getName(), equalTo(secondGenerationIndex));
});
}, 30, TimeUnit.SECONDS);
}

public void testCollectAndMarkIndicesForFrozen() throws Exception {
assumeTrue("requires feature flag to be enabled", DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled());

client().execute(
TransportPutRepositoryAction.TYPE,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, DEFAULT_REPO).name(DEFAULT_REPO)
.type("fs")
.settings(Settings.builder().put("location", DEFAULT_REPO))
).get();
updateClusterSettings(Settings.builder().put(RepositoriesService.DEFAULT_REPOSITORY_SETTING.getKey(), DEFAULT_REPO));

DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder()
.frozenAfter(TimeValue.timeValueDays(1))
.buildTemplate();

Iterable<DataStreamLifecycleService> dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Clock clock = Clock.systemUTC();
AtomicLong now = new AtomicLong(clock.millis());
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(now::get));

putComposableIndexTemplate(
"mytemplate",
null,
List.of("foo*"),
Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1").build(),
null,
lifecycle,
null,
false
);

String dataStream = "foo-ds";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
dataStream
);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStream, randomIntBetween(10, 50));

// Let's verify the rollover
List<String> backingIndices = waitForDataStreamIndices(dataStream, 2, false);
String candidateIndex = backingIndices.get(0);

AtomicLong twoDaysLater = new AtomicLong(clock.millis() + TimeValue.timeValueDays(2).millis());
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(twoDaysLater::get));

assertBusy(() -> {
logger.info("--> checking to see if index has been marked for frozen");
ClusterStateResponse resp = client().execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT)).get();
ClusterState state = resp.getState();
String setRepo = Optional.ofNullable(state.metadata().getProject(Metadata.DEFAULT_PROJECT_ID))
.map(pm -> pm.index(candidateIndex))
.map(peek(im -> logger.info("--> found index {}", candidateIndex)))
.map(im -> im.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY))
.map(peek(custom -> logger.info("--> index {} has custom metadata: {}", candidateIndex, custom)))
.map(meta -> meta.get(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY))
.map(peek(repo -> logger.info("--> index {} has repo {} configured", candidateIndex, repo)))
.orElse("_unset_");
logger.info("--> repository set to: {}", setRepo);
assertThat(setRepo, equalTo(DEFAULT_REPO));
}, 30, TimeUnit.SECONDS);
Comment on lines +1010 to +1057
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset the injected clock in a finally.

This test mutates every DataStreamLifecycleService via setNowSupplier(...) and never restores it. With randomized internal-cluster test execution, later methods can inherit the frozen clock and fail nondeterministically. Please mirror the try/finally reset pattern already used in testSystemDataStreamRetention.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java`
around lines 999 - 1047, The test mutates every DataStreamLifecycleService by
calling setNowSupplier(...) and never restores it; wrap the mutation and
assertions in a try/finally: before you change suppliers capture each instance's
original supplier (iterate
internalCluster().getInstances(DataStreamLifecycleService.class) and store
current supplier references), set the test supplier (now::get /
twoDaysLater::get) as you do now, then in a finally block restore each
DataStreamLifecycleService by calling setNowSupplier(originalSupplier) so the
injected clock is reset; follow the same try/finally/reset pattern used in
testSystemDataStreamRetention and reference
DataStreamLifecycleService.setNowSupplier, internalCluster().getInstances,
now/twoDaysLater suppliers in your change.


dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis));
}

/**
* Helper for peeking Optionals
*/
<T> UnaryOperator<T> peek(Consumer<T> c) {
return x -> {
c.accept(x);
return x;
};
}

static void indexDocs(String dataStream, int numDocs) {
Expand Down
1 change: 1 addition & 0 deletions modules/data-streams/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
requires org.elasticsearch.xcontent;
requires org.apache.logging.log4j;
requires org.apache.lucene.core;
requires org.elasticsearch.logging;

exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server;
Expand Down
Loading
Loading