Mark indices ready for frozen conversion in DLM service#144248
Mark indices ready for frozen conversion in DLM service#144248dakrone merged 19 commits intoelastic:mainfrom
Conversation
This commit enhances the DLM service (`DatastreamLifecycleService`) to collect indices that are ready to be converted to a frozen index, and then mark those indices with the repository in which they should be converted. This behavior is behind the DLM frozen feature flag, and the marking only happens if the cluster already has a configured `repositories.default_repository` setting.
|
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
| public void testLifecycleAppliedToFailureStore() throws Exception { | ||
| DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.failuresLifecycleBuilder() | ||
| .dataRetention(TimeValue.timeValueSeconds(20)) | ||
| .dataRetention(TimeValue.timeValueMinutes(20)) |
There was a problem hiding this comment.
In case you are wondering why this seemingly unrelated change was made: I ran these tests many many times while I was developing the test for this PR. This one in particular was flaky, because on a slower machine the index ended up deleted before we could do the check. This change makes the test no longer flaky on my machine.
It does not actually change the test behavior, or what we're testing for this particular test.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThis change implements a feature for marking Elasticsearch data stream indices as candidates for freezing. The implementation introduces a new cluster state task ( Possibly related PRs
✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.41.1)modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.javaComment Tip You can enable review details to help with troubleshooting, context usage and more.Enable the |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java`:
- Around line 999-1047: The test mutates every DataStreamLifecycleService by
calling setNowSupplier(...) and never restores it; wrap the mutation and
assertions in a try/finally: before you change suppliers capture each instance's
original supplier (iterate
internalCluster().getInstances(DataStreamLifecycleService.class) and store
current supplier references), set the test supplier (now::get /
twoDaysLater::get) as you do now, then in a finally block restore each
DataStreamLifecycleService by calling setNowSupplier(originalSupplier) so the
injected clock is reset; follow the same try/finally/reset pattern used in
testSystemDataStreamRetention and reference
DataStreamLifecycleService.setNowSupplier, internalCluster().getInstances,
now/twoDaysLater suppliers in your change.
In
`@modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java`:
- Around line 1975-1977: Fix the Javadoc typo in DataStreamLifecycleService by
updating the comment above the executor declaration that currently reads
"Executor for marking indices for conversation to frozen" to "Executor for
marking indices for conversion to frozen" (locate the Javadoc attached to the
executor field/method in DataStreamLifecycleService and correct the single
word).
- Around line 546-548: The catch in DataStreamLifecycleService currently logs
"Data stream lifecycle failed to mark candidates for converting to frozen index
for data stream [%s]" but no data stream name is available so the [%s]
placeholder remains literal; update the catch log to remove the unused
placeholder and log the exception as the throwable (e.g., change the message to
"Data stream lifecycle failed to mark candidates for converting to frozen index"
and pass the Exception e as the throwable) so the stacktrace is preserved and no
unsubstituted placeholder is emitted.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 75823af3-d6c6-493a-b3b2-5cff04a91cdc
📒 Files selected for processing (6)
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.javamodules/data-streams/src/main/java/module-info.javamodules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.javamodules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTask.javamodules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.javamodules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTaskTests.java
| Iterable<DataStreamLifecycleService> dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); | ||
| Clock clock = Clock.systemUTC(); | ||
| AtomicLong now = new AtomicLong(clock.millis()); | ||
| dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(now::get)); | ||
|
|
||
| putComposableIndexTemplate( | ||
| "mytemplate", | ||
| null, | ||
| List.of("foo*"), | ||
| Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1").build(), | ||
| null, | ||
| lifecycle, | ||
| null, | ||
| false | ||
| ); | ||
|
|
||
| String dataStream = "foo-ds"; | ||
| CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( | ||
| TEST_REQUEST_TIMEOUT, | ||
| TEST_REQUEST_TIMEOUT, | ||
| dataStream | ||
| ); | ||
| client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); | ||
|
|
||
| indexDocs(dataStream, randomIntBetween(10, 50)); | ||
|
|
||
| // Let's verify the rollover | ||
| List<String> backingIndices = waitForDataStreamIndices(dataStream, 2, false); | ||
| String candidateIndex = backingIndices.get(0); | ||
| String writeIndex = backingIndices.get(1); | ||
|
|
||
| AtomicLong twoDaysLater = new AtomicLong(clock.millis() + TimeValue.timeValueDays(2).millis()); | ||
| dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(twoDaysLater::get)); | ||
|
|
||
| assertBusy(() -> { | ||
| logger.info("--> checking to see if index has been marked for frozen"); | ||
| ClusterStateResponse resp = client().execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT)).get(); | ||
| ClusterState state = resp.getState(); | ||
| String setRepo = Optional.ofNullable(state.metadata().getProject(Metadata.DEFAULT_PROJECT_ID)) | ||
| .map(pm -> pm.index(candidateIndex)) | ||
| .map(peek(im -> logger.info("--> found index {}", candidateIndex))) | ||
| .map(im -> im.getCustomData(DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) | ||
| .map(peek(custom -> logger.info("--> index {} has custom metadata: {}", candidateIndex, custom))) | ||
| .map(meta -> meta.get(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY)) | ||
| .map(peek(repo -> logger.info("--> index {} has repo {} configured", candidateIndex, repo))) | ||
| .orElse("_unset_"); | ||
| logger.info("--> repository set to: {}", setRepo); | ||
| assertThat(setRepo, equalTo(DEFAULT_REPO)); | ||
| }, 30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Reset the injected clock in a finally.
This test mutates every DataStreamLifecycleService via setNowSupplier(...) and never restores it. With randomized internal-cluster test execution, later methods can inherit the frozen clock and fail nondeterministically. Please mirror the try/finally reset pattern already used in testSystemDataStreamRetention.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java`
around lines 999 - 1047, The test mutates every DataStreamLifecycleService by
calling setNowSupplier(...) and never restores it; wrap the mutation and
assertions in a try/finally: before you change suppliers capture each instance's
original supplier (iterate
internalCluster().getInstances(DataStreamLifecycleService.class) and store
current supplier references), set the test supplier (now::get /
twoDaysLater::get) as you do now, then in a finally block restore each
DataStreamLifecycleService by calling setNowSupplier(originalSupplier) so the
injected clock is reset; follow the same try/finally/reset pattern used in
testSystemDataStreamRetention and reference
DataStreamLifecycleService.setNowSupplier, internalCluster().getInstances,
now/twoDaysLater suppliers in your change.
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Show resolved
Hide resolved
lukewhiting
left a comment
There was a problem hiding this comment.
Looking good :-) Just a few bits to tidy up
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Outdated
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Outdated
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Outdated
Show resolved
Hide resolved
...alClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java
Outdated
Show resolved
Hide resolved
...ams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndicesForFrozenTaskTests.java
Show resolved
Hide resolved
| } | ||
|
|
||
| // Update the custom metadata with the key (dlm_freeze_with) and value of the currently configured default repository | ||
| newMetadata.put(DataStreamLifecycleService.FROZEN_CANDIDATE_REPOSITORY_METADATA_KEY, defaultRepository); |
There was a problem hiding this comment.
leaving a note to myself, i should probably update the converter method to use this metadata for the repository checks
This framework was behind a feature flag (and thus not used in production). We are moving to a different execution model for the frozen conversion. Relates to elastic#144248 (comment)
Previously this method returned a list of indices, but it now returns a set. These indices don't actually need to be iterated in order for the DLM actions. Relates to elastic#144248 (comment)
This framework was behind a feature flag (and thus not used in production). We are moving to a different execution model for the frozen conversion. Relates to #144248 (comment)
* Refactor DataStream.getIndicesOlderThan to return a Set Previously this method returned a list of indices, but it now returns a set. These indices don't actually need to be iterated in order for the DLM actions. Relates to #144248 (comment) * Return unmodifiable set to be consistent * Now that it's immutable, we need a mutable version in DLM service * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
As a result of elastic#144248 we mark the repository name to use for frozen conversion in the custom metadata of the backing index, so that a consistent repository is used for the whole process. This commit enhances the conversion faculties to use this repository name.
* Mark indices ready for frozen conversion in DLM service This commit enhances the DLM service (`DatastreamLifecycleService`) to collect indices that are ready to be converted to a frozen index, and then mark those indices with the repository in which they should be converted. This behavior is behind the DLM frozen feature flag, and the marking only happens if the cluster already has a configured `repositories.default_repository` setting. * Remove extra %s * Let's not just talk about converting, let's actually convert * Index -> Indices for executor * Remove unused variable * Add missing custom metadata in test * Unset timer after test
This framework was behind a feature flag (and thus not used in production). We are moving to a different execution model for the frozen conversion. Relates to elastic#144248 (comment)
* Refactor DataStream.getIndicesOlderThan to return a Set Previously this method returned a list of indices, but it now returns a set. These indices don't actually need to be iterated in order for the DLM actions. Relates to elastic#144248 (comment) * Return unmodifiable set to be consistent * Now that it's immutable, we need a mutable version in DLM service * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
…144511) * Use marked repository name in `DataStreamLifecycleConvertToFrozen` As a result of #144248 we mark the repository name to use for frozen conversion in the custom metadata of the backing index, so that a consistent repository is used for the whole process. This commit enhances the conversion faculties to use this repository name.
…lastic#144511) * Use marked repository name in `DataStreamLifecycleConvertToFrozen` As a result of elastic#144248 we mark the repository name to use for frozen conversion in the custom metadata of the backing index, so that a consistent repository is used for the whole process. This commit enhances the conversion faculties to use this repository name.
This commit enhances the DLM service (
DatastreamLifecycleService) to collect indices that are ready to be converted to a frozen index, and then mark those indices with the repository in which they should be converted.This behavior is behind the DLM frozen feature flag, and the marking only happens if the cluster already has a configured
repositories.default_repositorysetting.