From e45b2efb694964057619b086d24cc7a1bdc22e39 Mon Sep 17 00:00:00 2001 From: happysubin Date: Fri, 23 May 2025 19:01:08 +0900 Subject: [PATCH 01/11] [ILM] Fix TSDS unfollow timing with WaitUntilTimeSeriesEndTimePassesStep --- .../xpack/core/ilm/UnfollowAction.java | 25 ++++++++++----- .../xpack/core/ilm/UnfollowActionTests.java | 31 +++++++++++-------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java index a513dfdd13434..37955d48d63bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP); StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME); StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); + StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME); StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME); StepKey unfollowFollowerIndex = new StepKey(phase, NAME, UnfollowFollowerIndexStep.NAME); @@ -60,13 +62,22 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { return customIndexMetadata == null; }); WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks); - WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client); - PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); - CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); - UnfollowFollowerIndexStep step5 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client); - OpenIndexStep step6 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client); - WaitForIndexColorStep step7 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW); - return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7); + WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep( + waitForFollowShardTasks, + waitUntilTimeSeriesEndTimePassesStep, + client + ); + WaitUntilTimeSeriesEndTimePassesStep step3 = new WaitUntilTimeSeriesEndTimePassesStep( + waitUntilTimeSeriesEndTimePassesStep, + pauseFollowerIndex, + Instant::now + ); + PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); + CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); + UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client); + OpenIndexStep step7 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client); + WaitForIndexColorStep step8 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW); + return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7, step8); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java index 66aacfdc68667..e9d9b8a3c81aa 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java @@ -51,16 +51,17 @@ public void testToSteps() { ); List steps = action.toSteps(null, phase, nextStepKey); assertThat(steps, notNullValue()); - assertThat(steps.size(), equalTo(8)); + assertThat(steps.size(), equalTo(9)); StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP); StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME); StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); - StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); - StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); - StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME); - StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME); - StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME); + StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); + StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); + StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME); + StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME); + StepKey expectedNinthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME); BranchingStep firstStep = (BranchingStep) steps.get(0); assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey)); @@ -73,26 +74,30 @@ public void testToSteps() { assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey)); assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey)); - PauseFollowerIndexStep fourthStep = (PauseFollowerIndexStep) steps.get(3); + WaitUntilTimeSeriesEndTimePassesStep fourthStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(3); assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey)); assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey)); - CloseFollowerIndexStep fifthStep = (CloseFollowerIndexStep) steps.get(4); + PauseFollowerIndexStep fifthStep = (PauseFollowerIndexStep) steps.get(4); assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey)); assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey)); - UnfollowFollowerIndexStep sixthStep = (UnfollowFollowerIndexStep) steps.get(5); + CloseFollowerIndexStep sixthStep = (CloseFollowerIndexStep) steps.get(5); assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey)); assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey)); - OpenIndexStep seventhStep = (OpenIndexStep) steps.get(6); + UnfollowFollowerIndexStep seventhStep = (UnfollowFollowerIndexStep) steps.get(6); assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey)); assertThat(seventhStep.getNextStepKey(), equalTo(expectedEighthStepKey)); - WaitForIndexColorStep eighthStep = (WaitForIndexColorStep) steps.get(7); - assertThat(eighthStep.getColor(), is(ClusterHealthStatus.YELLOW)); + OpenIndexStep eighthStep = (OpenIndexStep) steps.get(7); assertThat(eighthStep.getKey(), equalTo(expectedEighthStepKey)); - assertThat(eighthStep.getNextStepKey(), equalTo(nextStepKey)); + assertThat(eighthStep.getNextStepKey(), equalTo(expectedNinthStepKey)); + + WaitForIndexColorStep ninth = (WaitForIndexColorStep) steps.get(8); + assertThat(ninth.getColor(), is(ClusterHealthStatus.YELLOW)); + assertThat(ninth.getKey(), equalTo(expectedNinthStepKey)); + assertThat(ninth.getNextStepKey(), equalTo(nextStepKey)); } @Override From e759e975d06f172a50b5ea2df7f0924f67e94faf Mon Sep 17 00:00:00 2001 From: happysubin Date: Thu, 29 May 2025 21:23:38 +0900 Subject: [PATCH 02/11] [ILM] Move WaitUntilTimeSeriesEndTimePassesStep before WaitForFollowShardTasksStep --- .../xpack/core/ilm/UnfollowAction.java | 18 ++++++++++-------- .../xpack/core/ilm/UnfollowActionTests.java | 8 ++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java index 37955d48d63bd..d0c69cc27d443 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java @@ -45,8 +45,8 @@ private UnfollowAction() {} public List toSteps(Client client, String phase, StepKey nextStepKey) { StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP); StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME); - StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME); StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME); StepKey unfollowFollowerIndex = new StepKey(phase, NAME, UnfollowFollowerIndexStep.NAME); @@ -61,16 +61,18 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { // if the index has no CCR metadata we'll skip the unfollow action completely return customIndexMetadata == null; }); - WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks); - WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep( - waitForFollowShardTasks, + WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitUntilTimeSeriesEndTimePassesStep); + + WaitUntilTimeSeriesEndTimePassesStep step2 = new WaitUntilTimeSeriesEndTimePassesStep( waitUntilTimeSeriesEndTimePassesStep, - client + waitForFollowShardTasks, + Instant::now ); - WaitUntilTimeSeriesEndTimePassesStep step3 = new WaitUntilTimeSeriesEndTimePassesStep( - waitUntilTimeSeriesEndTimePassesStep, + + WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep( + waitForFollowShardTasks, pauseFollowerIndex, - Instant::now + client ); PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java index e9d9b8a3c81aa..3820eaa19445f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java @@ -55,8 +55,8 @@ public void testToSteps() { StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP); StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME); - StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); - StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME); @@ -70,11 +70,11 @@ public void testToSteps() { assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey)); assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey)); - WaitForFollowShardTasksStep thirdStep = (WaitForFollowShardTasksStep) steps.get(2); + WaitUntilTimeSeriesEndTimePassesStep thirdStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(2); assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey)); assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey)); - WaitUntilTimeSeriesEndTimePassesStep fourthStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(3); + WaitForFollowShardTasksStep fourthStep = (WaitForFollowShardTasksStep) steps.get(3); assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey)); assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey)); From 4634388b7e58811cbc4d37632fc94e19c52a5cac Mon Sep 17 00:00:00 2001 From: happysubin Date: Sat, 7 Jun 2025 14:42:28 +0900 Subject: [PATCH 03/11] style: apply code format to UnfollowAction.java --- .../org/elasticsearch/xpack/core/ilm/UnfollowAction.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java index d0c69cc27d443..304968a0cc51a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java @@ -69,11 +69,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { Instant::now ); - WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep( - waitForFollowShardTasks, - pauseFollowerIndex, - client - ); + WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client); PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client); From 99d6ecf4129ccbc3407b4dcee77cf5d2f18ffdd4 Mon Sep 17 00:00:00 2001 From: happysubin Date: Sat, 7 Jun 2025 14:43:08 +0900 Subject: [PATCH 04/11] test: Add integration test for TSDB rollover and CCR sync during ILM wait step --- .../xpack/ilm/CCRIndexLifecycleIT.java | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 66bcb1b201cc2..149d6246e2ecc 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -12,11 +12,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; @@ -28,9 +31,11 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.UnfollowAction; +import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,7 +44,9 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -47,6 +54,37 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class); + private static final String TSDB_INDEX_TEMPLATE = """ + { + "index_patterns": ["%s*"], + "data_stream": {}, + "template": { + "settings":{ + "index": { + "number_of_replicas": 0, + "number_of_shards": 1, + "routing_path": ["metricset"], + "mode": "time_series" + }, + "index.lifecycle.name": "%s" + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "volume": { + "type": "double", + "time_series_metric": "gauge" + } + } + } + } + }"""; public void testBasicCCRAndILMIntegration() throws Exception { String indexName = "logs-1"; @@ -533,6 +571,94 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception { } } + @SuppressWarnings({ "checkstyle:LineLength", "unchecked" }) + public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception { + String indexPattern = "tsdb-index-"; + String dataStream = "tsdb-index-cpu"; + String policyName = "tsdb-policy"; + + if ("leader".equals(targetCluster)) { + putILMPolicy(policyName, null, 1, null); + Request templateRequest = new Request("PUT", "/_index_template/tsdb_template"); + templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName)); + assertOK(client().performRequest(templateRequest)); + } else if ("follow".equals(targetCluster)) { + putILMPolicy(policyName, null, 1, null); + + Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); + createAutoFollowRequest.setJsonEntity(""" + { + "leader_index_patterns": [ ".ds-tsdb-index-*" ], + "remote_cluster": "leader_cluster", + "read_poll_timeout": "1000ms", + "follow_index_pattern": "{{leader_index}}" + }"""); + assertOK(client().performRequest(createAutoFollowRequest)); + + try (RestClient leaderClient = buildLeaderClient()) { + String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); + index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); + + String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); + assertBusy(() -> { assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))); }); + + // rollover + Request rolloverRequest = new Request("POST", "/" + dataStream + "/_rollover"); + rolloverRequest.setJsonEntity(""" + { + "conditions": { + "max_docs": "1" + } + }"""); + leaderClient.performRequest(rolloverRequest); + + assertBusy(() -> { + assertThat( + "index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses", + explainIndex(client(), backingIndexName).get("step"), + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) + ); + + assertThat(explainIndex(client(), backingIndexName).get("step_info"), is(notNullValue())); + assertThat( + (String) ((Map) explainIndex(client(), backingIndexName).get("step_info")).get("message"), + containsString("Waiting until the index's time series end time lapses") + ); + }, 30, TimeUnit.SECONDS); + + int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName); + + // Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep + String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()) + .format(Instant.now().plusSeconds(30)); + + for (int i = 0; i < 5; i++) { + index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i); + } + + // Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep + assertBusy(() -> { + int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName); + int currentFollowerDocCount = getDocCount(client(), backingIndexName); + + assertThat( + "Leader should have more documents than initially", + currentLeaderDocCount, + greaterThan(initialLeaderDocCount) + ); + assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount)); + + // Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep + assertThat( + "Index should still be in WaitUntilTimeSeriesEndTimePassesStep", + explainIndex(client(), backingIndexName).get("step"), + is(WaitUntilTimeSeriesEndTimePassesStep.NAME) + ); + }, 30, TimeUnit.SECONDS); + } + } + } + private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException { logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed); Request request = new Request("PUT", "/_cluster/settings"); @@ -839,4 +965,33 @@ private static String getShrinkIndexName(RestClient client, String originalIndex : "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]"; return shrunkenIndexName[0]; } + + private static Map explainIndex(RestClient client, String indexName) throws IOException { + RequestOptions consumeWarningsOptions = RequestOptions.DEFAULT.toBuilder() + .setWarningsHandler(warnings -> warnings.isEmpty() == false && List.of(""" + [indices.lifecycle.rollover.only_if_has_documents] setting was deprecated in Elasticsearch \ + and will be removed in a future release. \ + See the deprecation documentation for the next major version.""").equals(warnings) == false) + .build(); + + Request explainRequest = new Request("GET", indexName + "/_ilm/explain"); + explainRequest.setOptions(consumeWarningsOptions); + Response response = client.performRequest(explainRequest); + Map responseMap; + try (InputStream is = response.getEntity().getContent()) { + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + + @SuppressWarnings("unchecked") + Map> indexResponse = ((Map>) responseMap.get("indices")); + return indexResponse.get(indexName); + } + + private static int getDocCount(RestClient client, String indexName) throws IOException { + Request countRequest = new Request("GET", "/" + indexName + "/_count"); + Response response = client.performRequest(countRequest); + Map result = entityAsMap(response); + System.out.println("result = " + result); + return (int) result.get("count"); + } } From cb5e90842c2dd3ab704bf832fb4ec5852abeda48 Mon Sep 17 00:00:00 2001 From: happysubin Date: Mon, 9 Jun 2025 18:40:56 +0900 Subject: [PATCH 05/11] fix: remove println(), line length warning supress --- .../java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 149d6246e2ecc..1fc863c7d6777 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -571,7 +571,7 @@ public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception { } } - @SuppressWarnings({ "checkstyle:LineLength", "unchecked" }) + @SuppressWarnings("unchecked") public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception { String indexPattern = "tsdb-index-"; String dataStream = "tsdb-index-cpu"; @@ -991,7 +991,6 @@ private static int getDocCount(RestClient client, String indexName) throws IOExc Request countRequest = new Request("GET", "/" + indexName + "/_count"); Response response = client.performRequest(countRequest); Map result = entityAsMap(response); - System.out.println("result = " + result); return (int) result.get("count"); } } From 08fd1786b281e4eb43e2b05840f85b97e97764b0 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 10 Jun 2025 10:58:04 +0300 Subject: [PATCH 06/11] Add changelog --- docs/changelog/128361.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/128361.yaml diff --git a/docs/changelog/128361.yaml b/docs/changelog/128361.yaml new file mode 100644 index 0000000000000..901c1141afe90 --- /dev/null +++ b/docs/changelog/128361.yaml @@ -0,0 +1,6 @@ +pr: 128361 +summary: The follower index should wait until the time series end time passes before unfollowing the leader index. +area: ILM+SLM +type: bug +issues: + - 128129 From 47f22554bb04df4d577e50bd299e9e7077e8510e Mon Sep 17 00:00:00 2001 From: happysubin Date: Tue, 10 Jun 2025 20:29:44 +0900 Subject: [PATCH 07/11] refactor: improve test code quality and readability --- .../xpack/ilm/CCRIndexLifecycleIT.java | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 1fc863c7d6777..11d2665fb917e 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; @@ -588,7 +587,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); createAutoFollowRequest.setJsonEntity(""" { - "leader_index_patterns": [ ".ds-tsdb-index-*" ], + "leader_index_patterns": [ "tsdb-index-*" ], "remote_cluster": "leader_cluster", "read_poll_timeout": "1000ms", "follow_index_pattern": "{{leader_index}}" @@ -600,28 +599,23 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); - assertBusy(() -> { assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))); }); + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName)))); // rollover Request rolloverRequest = new Request("POST", "/" + dataStream + "/_rollover"); - rolloverRequest.setJsonEntity(""" - { - "conditions": { - "max_docs": "1" - } - }"""); leaderClient.performRequest(rolloverRequest); assertBusy(() -> { + Map indexExplanation = explainIndex(client(), backingIndexName); assertThat( "index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses", - explainIndex(client(), backingIndexName).get("step"), + indexExplanation.get("step"), is(WaitUntilTimeSeriesEndTimePassesStep.NAME) ); - assertThat(explainIndex(client(), backingIndexName).get("step_info"), is(notNullValue())); + assertThat(indexExplanation.get("step_info"), is(notNullValue())); assertThat( - (String) ((Map) explainIndex(client(), backingIndexName).get("step_info")).get("message"), + (String) ((Map) indexExplanation.get("step_info")).get("message"), containsString("Waiting until the index's time series end time lapses") ); }, 30, TimeUnit.SECONDS); @@ -967,15 +961,7 @@ private static String getShrinkIndexName(RestClient client, String originalIndex } private static Map explainIndex(RestClient client, String indexName) throws IOException { - RequestOptions consumeWarningsOptions = RequestOptions.DEFAULT.toBuilder() - .setWarningsHandler(warnings -> warnings.isEmpty() == false && List.of(""" - [indices.lifecycle.rollover.only_if_has_documents] setting was deprecated in Elasticsearch \ - and will be removed in a future release. \ - See the deprecation documentation for the next major version.""").equals(warnings) == false) - .build(); - Request explainRequest = new Request("GET", indexName + "/_ilm/explain"); - explainRequest.setOptions(consumeWarningsOptions); Response response = client.performRequest(explainRequest); Map responseMap; try (InputStream is = response.getEntity().getContent()) { From feab346a12f4fec736ca85677d1a256a6cdf93c8 Mon Sep 17 00:00:00 2001 From: happysubin Date: Wed, 11 Jun 2025 18:04:05 +0900 Subject: [PATCH 08/11] fix: increase assertBusy timeout --- .../java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 11d2665fb917e..775f468fc8593 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -618,7 +618,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc (String) ((Map) indexExplanation.get("step_info")).get("message"), containsString("Waiting until the index's time series end time lapses") ); - }, 30, TimeUnit.SECONDS); + }, 5, TimeUnit.MINUTES); int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName); @@ -648,7 +648,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc explainIndex(client(), backingIndexName).get("step"), is(WaitUntilTimeSeriesEndTimePassesStep.NAME) ); - }, 30, TimeUnit.SECONDS); + }, 5, TimeUnit.MINUTES); } } } From 9598115d23697ef2265a4dc614d1be0eed3291e9 Mon Sep 17 00:00:00 2001 From: happysubin Date: Thu, 12 Jun 2025 20:33:44 +0900 Subject: [PATCH 09/11] fix: Address flakiness in follower cluster rollover test --- .../elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 775f468fc8593..8114112f881bb 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -582,7 +582,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName)); assertOK(client().performRequest(templateRequest)); } else if ("follow".equals(targetCluster)) { - putILMPolicy(policyName, null, 1, null); + putUnfollowOnlyPolicy(client(), policyName); Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); createAutoFollowRequest.setJsonEntity(""" @@ -601,10 +601,6 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName)))); - // rollover - Request rolloverRequest = new Request("POST", "/" + dataStream + "/_rollover"); - leaderClient.performRequest(rolloverRequest); - assertBusy(() -> { Map indexExplanation = explainIndex(client(), backingIndexName); assertThat( @@ -618,7 +614,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc (String) ((Map) indexExplanation.get("step_info")).get("message"), containsString("Waiting until the index's time series end time lapses") ); - }, 5, TimeUnit.MINUTES); + }, 30, TimeUnit.SECONDS); int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName); @@ -648,7 +644,7 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc explainIndex(client(), backingIndexName).get("step"), is(WaitUntilTimeSeriesEndTimePassesStep.NAME) ); - }, 5, TimeUnit.MINUTES); + }, 30, TimeUnit.SECONDS); } } } From 18bcd3de23efb8895cb3e5047af9609104e58c31 Mon Sep 17 00:00:00 2001 From: happysubin Date: Fri, 13 Jun 2025 19:44:12 +0900 Subject: [PATCH 10/11] docs: explain why certain implementations were chosen --- .../org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 8114112f881bb..9cdb09f0edd24 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -582,6 +582,9 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName)); assertOK(client().performRequest(templateRequest)); } else if ("follow".equals(targetCluster)) { + // Use unfollow-only policy for follower cluster instead of regular ILM policy + // Follower clusters should not have their own rollover actions as they are meant + // to follow the rollover behavior of the leader index, not initiate their own rollovers putUnfollowOnlyPolicy(client(), policyName); Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern"); @@ -596,6 +599,10 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc try (RestClient leaderClient = buildLeaderClient()) { String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); + + // Wait for ILM rollover instead of manual rollover to ensure 'index.lifecycle.indexing_complete' is set + // Manual rollover removed as it doesn't properly set lifecycle completion flag + // Let ILM naturally trigger rollover when document is indexed index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0); From 1ad5d31e02804af8389e29e795304811a0326ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=95=88=EC=88=98=EB=B9=88?= <76802855+happysubin@users.noreply.github.com> Date: Tue, 17 Jun 2025 00:17:49 +0900 Subject: [PATCH 11/11] Improve ILM rollover comment clarity Co-authored-by: Mary Gouseti --- .../org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java index 9cdb09f0edd24..0b26c0c323abe 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java @@ -600,9 +600,8 @@ public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exc try (RestClient leaderClient = buildLeaderClient()) { String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); - // Wait for ILM rollover instead of manual rollover to ensure 'index.lifecycle.indexing_complete' is set - // Manual rollover removed as it doesn't properly set lifecycle completion flag - // Let ILM naturally trigger rollover when document is indexed + // Index a document on the leader index, this should trigger an ILM rollover. + // This will ensure that 'index.lifecycle.indexing_complete' is set. index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0);