From 15809d444c7dd1755733a5733d89d93ce3e13527 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 23 Jul 2025 21:18:15 +0300 Subject: [PATCH 1/3] Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767) This adds a test that covers relocation for shards that are running a force merge. Relates https://github.com/elastic/elasticsearch/issues/93503 --- .../index/engine/MergeWithLowDiskSpaceIT.java | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index aab77f64f9fb7..63e87e759a419 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -10,16 +10,23 @@ package org.elasticsearch.index.engine; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.DiskUsageIntegTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; @@ -28,16 +35,20 @@ import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; @BeforeClass @@ -235,6 +246,115 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { assertAcked(indicesAdmin().prepareDelete(indexName).get()); } + public void testRelocationWhileForceMerging() throws Exception { + final String node1 = internalCluster().startNode(); + ensureStableCluster(1); + setTotalSpace(node1, Long.MAX_VALUE); + String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + prepareCreate(indexName, indexSettings(1, 0)).get(); + // get current disk space usage (for all indices on the node) + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); + // restrict the total disk space such that the next merge does not have sufficient disk space + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); + setTotalSpace(node1, insufficientTotalDiskSpace); + // node stats' FS stats should report that there is insufficient disk space available + assertBusy(() -> { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); + }); + int indexingRounds = randomIntBetween(5, 10); + while (indexingRounds-- > 0) { + indexRandom( + true, + true, + true, + false, + IntStream.range(1, randomIntBetween(5, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) + .toList() + ); + } + // the max segments argument makes it a blocking call + ActionFuture forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName) + .setMaxNumSegments(1) + .execute(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1) + .getThreadPoolMergeExecutorService(); + TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1); + assertBusy(() -> { + // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + // telemetry says that there are indeed some segments enqueued to be merged + testTelemetryPlugin.collect(); + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), + greaterThan(0L) + ); + // but still no merges are currently running + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), + equalTo(0L) + ); + // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); + long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); + assertThat(currentMergeCount, equalTo(0L)); + }); + // the force merge call is still blocked + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); + assertFalse(forceMergeBeforeRelocationFuture.isDone()); + // merge executor still confirms merging is blocked due to insufficient disk space + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get(); + // the index should have more than 1 segments at this stage + assertThat( + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), + iterableWithSize(greaterThan(1)) + ); + // start another node + final String node2 = internalCluster().startNode(); + ensureStableCluster(2); + setTotalSpace(node2, Long.MAX_VALUE); + // relocate the shard from node1 to node2 + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID)); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + // the force merge call is now unblocked + assertBusy(() -> { + assertTrue(forceMergeBeforeRelocationFuture.isDone()); + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); + }); + // there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment, + // so let's trigger a force merge to 1 segment again (this one should succeed promptly) + indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); + IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get(); + // assert there's only one segment now + assertThat( + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), + iterableWithSize(1) + ); + // also assert that the shard was indeed moved to a different node + assertThat( + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() + .currentNodeId(), + not( + equalTo( + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() + .currentNodeId() + ) + ) + ); + } + public void setTotalSpace(String dataNodeName, long totalSpace) { getTestFileStore(dataNodeName).setTotalSpace(totalSpace); refreshClusterInfo(); From 3c320b57422e015f94960c7a745f997336b5423d Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 28 Jul 2025 14:04:26 +0300 Subject: [PATCH 2/3] Fix MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131806) The index settings are randomized in the test, but this test suite doesn't work when indices have a custom data path. --- .../elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 63e87e759a419..36438fdea2a9a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -251,7 +251,10 @@ public void testRelocationWhileForceMerging() throws Exception { ensureStableCluster(1); setTotalSpace(node1, Long.MAX_VALUE); String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - prepareCreate(indexName, indexSettings(1, 0)).get(); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); // get current disk space usage (for all indices on the node) IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); From c84ebc23c8ae1f1ea3ccf04aa9fed713aa3f628e Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 28 Jul 2025 16:18:19 +0300 Subject: [PATCH 3/3] Fix compilation --- .../index/engine/MergeWithLowDiskSpaceIT.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 36438fdea2a9a..77eff47b2f3e2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -20,7 +20,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.DiskUsageIntegTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; @@ -287,22 +286,10 @@ public void testRelocationWhileForceMerging() throws Exception { .execute(); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1) .getThreadPoolMergeExecutorService(); - TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1); assertBusy(() -> { // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); - // telemetry says that there are indeed some segments enqueued to be merged - testTelemetryPlugin.collect(); - assertThat( - testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), - greaterThan(0L) - ); - // but still no merges are currently running - assertThat( - testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), - equalTo(0L) - ); // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); @@ -324,7 +311,7 @@ public void testRelocationWhileForceMerging() throws Exception { ensureStableCluster(2); setTotalSpace(node2, Long.MAX_VALUE); // relocate the shard from node1 to node2 - ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID)); + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2)); ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) .setWaitForNoRelocatingShards(true)