diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java index 3b07a319e1682..1c07ebec57a29 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java @@ -16,10 +16,11 @@ import java.io.Closeable; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.elasticsearch.logging.LogManager.getLogger; @@ -35,32 +36,34 @@ class DlmFrozenTransitionExecutor implements Closeable { private static final Logger logger = getLogger(DlmFrozenTransitionExecutor.class); - private static final String EXECUTOR_NAME = "dlm-frozen-transition"; - private final Set runningTransitions; - private final int maxConcurrency; + private final Map submittedTransitions; private final ExecutorService executor; + private final int maxConcurrency; + private final int maxQueueSize; - DlmFrozenTransitionExecutor(int maxConcurrency, Settings settings) { - this.runningTransitions = ConcurrentHashMap.newKeySet(maxConcurrency); + DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings) { this.maxConcurrency = maxConcurrency; - this.executor = EsExecutors.newFixed( - EXECUTOR_NAME, - maxConcurrency, - -1, - EsExecutors.daemonThreadFactory(settings, EXECUTOR_NAME), - new ThreadContext(settings), - EsExecutors.TaskTrackingConfig.DEFAULT - ); + this.maxQueueSize = maxQueueSize; + this.submittedTransitions = new ConcurrentHashMap<>(maxQueueSize); + ThreadFactory esThreadFactory = EsExecutors.daemonThreadFactory(settings, EXECUTOR_NAME); + this.executor = EsExecutors.newFixed(EXECUTOR_NAME, maxConcurrency, maxQueueSize, r -> { + Thread thread = esThreadFactory.newThread(r); + if (r instanceof WrappedDlmFrozenTransitionRunnable runnable) { + String name = thread.getName(); + thread.setName(name + "[" + runnable.getIndexName() + "]"); + } + return thread; + }, new ThreadContext(settings), EsExecutors.TaskTrackingConfig.DEFAULT); } - public boolean isTransitionRunning(String indexName) { - return runningTransitions.contains(indexName); + public boolean transitionSubmitted(String indexName) { + return submittedTransitions.containsKey(indexName); } public boolean hasCapacity() { - return runningTransitions.size() < maxConcurrency; + return submittedTransitions.size() < (maxConcurrency + maxQueueSize); } public List shutdownNow() { @@ -69,36 +72,54 @@ public List shutdownNow() { public Future submit(DlmFrozenTransitionRunnable task) { final String indexName = task.getIndexName(); - runningTransitions.add(indexName); + submittedTransitions.put(indexName, false); try { return executor.submit(wrapRunnable(task)); } catch (Exception e) { - runningTransitions.remove(indexName); + submittedTransitions.remove(indexName); throw e; } } /** * Wraps the task with index tracking and error handling. Ensures the index name is always removed from - * {@link #runningTransitions} when the thread completes, whether successfully or with an error. + * {@link #submittedTransitions} when the thread completes, whether successfully or with an error. */ private Runnable wrapRunnable(DlmFrozenTransitionRunnable task) { - return () -> { + return new WrappedDlmFrozenTransitionRunnable(task); + } + + @Override + public void close() { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + + private class WrappedDlmFrozenTransitionRunnable implements Runnable { + private final DlmFrozenTransitionRunnable task; + + private WrappedDlmFrozenTransitionRunnable(DlmFrozenTransitionRunnable task) { + this.task = task; + } + + @Override + public void run() { final String indexName = task.getIndexName(); try { logger.debug("Starting transition for index [{}]", indexName); + Boolean previousValue = submittedTransitions.put(indexName, true); + assert Boolean.FALSE.equals(previousValue) + : "expected the previous value to exist and be false, but it was " + previousValue; task.run(); logger.debug("Transition completed for index [{}]", indexName); } catch (Exception ex) { logger.error(() -> Strings.format("Error executing transition for index [%s]", indexName), ex); } finally { - runningTransitions.remove(indexName); + submittedTransitions.remove(indexName); } - }; - } + } - @Override - public void close() { - ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + private String getIndexName() { + return task.getIndexName(); + } } } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java index ea36a140943da..1d15d61a4c589 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java @@ -38,7 +38,11 @@ public Collection createComponents(PluginServices services) { @Override public List> getSettings() { if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { - return List.of(DlmFrozenTransitionService.POLL_INTERVAL_SETTING, DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING); + return List.of( + DlmFrozenTransitionService.POLL_INTERVAL_SETTING, + DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING, + DlmFrozenTransitionService.MAX_QUEUE_SIZE + ); } else { return List.of(); } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java index 769b0b4e9da8c..f9245c6282076 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; @@ -58,6 +59,14 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable { Setting.Property.NodeScope ); + static final Setting MAX_QUEUE_SIZE = Setting.intSetting( + "dlm.frozen_transition.max_queue_size", + 500, + 1, + 10000, + Setting.Property.NodeScope + ); + private final ClusterService clusterService; private final AtomicBoolean isMaster = new AtomicBoolean(false); private ScheduledExecutorService schedulerThreadExecutor; @@ -65,6 +74,7 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable { private final AtomicBoolean closing = new AtomicBoolean(false); private final TimeValue pollInterval; private final int maxConcurrency; + private final int maxQueueSize; private final long initialDelayMillis; private final BiFunction transitionRunnableFactory; @@ -93,6 +103,7 @@ private DlmFrozenTransitionService( this.clusterService = clusterService; this.pollInterval = POLL_INTERVAL_SETTING.get(clusterService.getSettings()); this.maxConcurrency = MAX_CONCURRENCY_SETTING.get(clusterService.getSettings()); + this.maxQueueSize = MAX_QUEUE_SIZE.get(clusterService.getSettings()); this.transitionRunnableFactory = transitionRunnableFactory; this.initialDelayMillis = initialDelayMillis; } @@ -124,12 +135,10 @@ public void clusterChanged(ClusterChangedEvent event) { private void startThreadPools() { synchronized (this) { if (closing.get() == false) { - transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, clusterService.getSettings()); - schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor(r -> { - var thread = new Thread(r, "dlm-frozen-transition-scheduler"); - thread.setDaemon(true); - return thread; - }); + transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, maxQueueSize, clusterService.getSettings()); + schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor( + EsExecutors.daemonThreadFactory(clusterService.getSettings(), "dlm-frozen-transition-scheduler") + ); schedulerThreadExecutor.scheduleAtFixedRate( this::checkForFrozenIndices, initialDelayMillis, @@ -204,7 +213,7 @@ void checkForFrozenIndices() { } if (indexMarkedForFrozen(projectMetadata.index(index))) { logger.debug("Frozen index to process detected: {}", index); - if (executor.isTransitionRunning(index.getName())) { + if (executor.transitionSubmitted(index.getName())) { logger.debug("Transition already running for index [{}], skipping", index); continue; } else if (executor.hasCapacity() == false) { diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/plugin-metadata/entitlement-policy.yaml b/x-pack/plugin/dlm-frozen-transition/src/main/plugin-metadata/entitlement-policy.yaml deleted file mode 100644 index 2eb0d0dbd9881..0000000000000 --- a/x-pack/plugin/dlm-frozen-transition/src/main/plugin-metadata/entitlement-policy.yaml +++ /dev/null @@ -1,2 +0,0 @@ -ALL-UNNAMED: - - manage_threads diff --git a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java index a173d474e8337..162b7a7245c7d 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java +++ b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java @@ -8,100 +8,75 @@ package org.elasticsearch.xpack.dlm.frozen; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.WrappedRunnable; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class DlmFrozenTransitionExecutorTests extends ESTestCase { - /** - * Minimal test double implementing {@link DlmFrozenTransitionRunnable} with deterministic, test-controlled behavior. - * The {@code started} latch always counts down when the task begins. Set {@code blockUntil} to a non-released latch - * to hold the task, or leave it at the default (already released) for tasks that complete immediately. - */ - static class TestDlmFrozenTransitionRunnable implements DlmFrozenTransitionRunnable { - private final String indexName; - CountDownLatch started = new CountDownLatch(1); - CountDownLatch blockUntil = new CountDownLatch(0); - Throwable throwOnRun; - - TestDlmFrozenTransitionRunnable(String indexName) { - this.indexName = indexName; - } - - @Override - public String getIndexName() { - return indexName; - } - - @Override - public void run() { - started.countDown(); - try { - blockUntil.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - if (throwOnRun instanceof RuntimeException rte) { - throw rte; - } else if (throwOnRun instanceof Error error) { - throw error; - } - } - } - - public void testIsTransitionRunning() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, Settings.EMPTY)) { + public void testTransitionSubmitted() throws Exception { + try (var executor = new DlmFrozenTransitionExecutor(2, 10, Settings.EMPTY)) { var task = new TestDlmFrozenTransitionRunnable("running-index"); task.blockUntil = new CountDownLatch(1); - assertFalse(executor.isTransitionRunning("running-index")); + assertFalse(executor.transitionSubmitted("running-index")); - executor.submit(task); + Future future = executor.submit(task); safeAwait(task.started); - assertTrue(executor.isTransitionRunning("running-index")); - assertFalse(executor.isTransitionRunning("other-index")); + assertTrue(executor.transitionSubmitted("running-index")); + assertFalse(executor.transitionSubmitted("other-index")); task.blockUntil.countDown(); } } public void testTransitionRemovedAfterCompletion() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY)) { var task = new TestDlmFrozenTransitionRunnable("done-index"); executor.submit(task).get(10, TimeUnit.SECONDS); - assertFalse(executor.isTransitionRunning("done-index")); + assertFalse(executor.transitionSubmitted("done-index")); } } public void testTransitionRemovedAfterFailure() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY)) { var runtimeTask = new TestDlmFrozenTransitionRunnable("exception-index"); runtimeTask.throwOnRun = new IllegalStateException("simulated failure"); executor.submit(runtimeTask).get(10, TimeUnit.SECONDS); - assertFalse(executor.isTransitionRunning("exception-index")); + assertFalse(executor.transitionSubmitted("exception-index")); } } public void testHasCapacity() throws Exception { - int maxConcurrency = 2; - try (var executor = new DlmFrozenTransitionExecutor(maxConcurrency, Settings.EMPTY)) { - CountDownLatch tasksStarted = new CountDownLatch(2); + int maxQueue = randomIntBetween(2, 50); + try (var executor = new DlmFrozenTransitionExecutor(1, maxQueue, Settings.EMPTY)) { + CountDownLatch tasksStarted = new CountDownLatch(1); + CountDownLatch firstTaskBlock = new CountDownLatch(1); CountDownLatch taskBlock = new CountDownLatch(1); assertTrue(executor.hasCapacity()); - for (int i = 0; i < maxConcurrency; i++) { + var firstTask = new TestDlmFrozenTransitionRunnable("index-first"); + firstTask.started = tasksStarted; + firstTask.blockUntil = firstTaskBlock; + executor.submit(firstTask); + + // Fill remaining queue + for (int i = 0; i < maxQueue; i++) { var task = new TestDlmFrozenTransitionRunnable("index-" + i); task.started = tasksStarted; task.blockUntil = taskBlock; @@ -111,13 +86,14 @@ public void testHasCapacity() throws Exception { assertTrue(tasksStarted.await(10, TimeUnit.SECONDS)); assertFalse(executor.hasCapacity()); - taskBlock.countDown(); + firstTaskBlock.countDown(); assertBusy(() -> assertTrue(executor.hasCapacity())); + taskBlock.countDown(); } } public void testShutdownNow() throws Exception { - var executor = new DlmFrozenTransitionExecutor(1, Settings.EMPTY); + var executor = new DlmFrozenTransitionExecutor(1, 10, Settings.EMPTY); var task = new TestDlmFrozenTransitionRunnable("block-index"); task.blockUntil = new CountDownLatch(1); @@ -129,19 +105,115 @@ public void testShutdownNow() throws Exception { executor.close(); } + /** + * A task that is submitted to the executor but waiting in the queue (single thread occupied) must still + * be reported as "submitted" by {@link DlmFrozenTransitionExecutor#transitionSubmitted}, because the entry + * is added to {@code submittedTransitions} at submission time, not when the thread actually starts. + * This is the invariant that {@code checkForFrozenIndices} relies on to prevent re-submission of queued tasks. + */ + public void testTransitionSubmittedReturnsTrueForQueuedTask() throws Exception { + try (var executor = new DlmFrozenTransitionExecutor(1, 2, Settings.EMPTY)) { + CountDownLatch firstStarted = new CountDownLatch(1); + CountDownLatch block = new CountDownLatch(1); + + var runningTask = new TestDlmFrozenTransitionRunnable("running-index"); + runningTask.started = firstStarted; + runningTask.blockUntil = block; + executor.submit(runningTask); + safeAwait(firstStarted); // single thread is now occupied + + var queuedTask = new TestDlmFrozenTransitionRunnable("queued-index"); + queuedTask.blockUntil = block; + executor.submit(queuedTask); // sits in the queue; has not started + + assertEquals("Queued task should not have started yet", 1, queuedTask.started.getCount()); + assertTrue("transitionSubmitted must return true for a queued task", executor.transitionSubmitted("queued-index")); + + block.countDown(); + } + } + + /** + * When the underlying executor rejects a submission (queue full), {@link DlmFrozenTransitionExecutor#submit} + * must remove the index from {@code submittedTransitions} before rethrowing, so that a future poll can retry. + */ + public void testSubmitCleansUpEntryOnRejectedExecution() throws Exception { + var executor = new DlmFrozenTransitionExecutor(1, 1, Settings.EMPTY); + try { + CountDownLatch block = new CountDownLatch(1); + CountDownLatch firstStarted = new CountDownLatch(1); + + var runningTask = new TestDlmFrozenTransitionRunnable("running-index"); + runningTask.started = firstStarted; + runningTask.blockUntil = block; + executor.submit(runningTask); + safeAwait(firstStarted); // single thread occupied + + var queuedTask = new TestDlmFrozenTransitionRunnable("queued-index"); + queuedTask.blockUntil = block; + executor.submit(queuedTask); // fills the one queue slot + + // Thread and queue are both full; next submit must be rejected + var rejectedTask = new TestDlmFrozenTransitionRunnable("rejected-index"); + expectThrows(RejectedExecutionException.class, () -> executor.submit(rejectedTask)); + + // The cleanup branch in submit() must have removed the entry so the index is no longer tracked + assertFalse("Rejected index must be removed from submittedTransitions", executor.transitionSubmitted("rejected-index")); + + block.countDown(); + } finally { + executor.close(); + } + } + + /** + * {@link DlmFrozenTransitionExecutor#shutdownNow()} must return tasks that were waiting in the queue + * and had not yet started, not only the currently-executing task. + */ + public void testShutdownNowReturnsQueuedTasks() throws Exception { + var executor = new DlmFrozenTransitionExecutor(1, 5, Settings.EMPTY); + CountDownLatch block = new CountDownLatch(1); + CountDownLatch firstStarted = new CountDownLatch(1); + + var runningTask = new TestDlmFrozenTransitionRunnable("running-index"); + runningTask.started = firstStarted; + runningTask.blockUntil = block; + executor.submit(runningTask); + safeAwait(firstStarted); // single thread occupied + + Set submittedFutures = new HashSet<>(3); + for (int i = 0; i < 3; i++) { + var queuedTask = new TestDlmFrozenTransitionRunnable("queued-index-" + i); + queuedTask.blockUntil = block; + submittedFutures.add((Runnable) executor.submit(queuedTask)); + } + + List cancelled = executor.shutdownNow(); + Set unwrappedCancelled = cancelled.stream().map(r -> { + Runnable unwrapped = r; + while (unwrapped instanceof WrappedRunnable wr) { + unwrapped = wr.unwrap(); + } + return unwrapped; + }).collect(Collectors.toSet()); + assertEquals(submittedFutures, unwrappedCancelled); + + executor.close(); + } + /** * Uses a {@link CyclicBarrier} to ensure all submitting threads call {@code submit()} at the same time, * verifying the executor accepts {@code maxConcurrency} simultaneous submissions without rejection. */ public void testSimultaneousSubmissionsFromMultipleThreads() throws Exception { int maxConcurrency = between(2, 50); - try (var executor = new DlmFrozenTransitionExecutor(maxConcurrency, Settings.EMPTY)) { - CyclicBarrier barrier = new CyclicBarrier(maxConcurrency); + try (var executor = new DlmFrozenTransitionExecutor(maxConcurrency, 1, Settings.EMPTY)) { + CyclicBarrier barrier = new CyclicBarrier(maxConcurrency + 1); List> futures = new CopyOnWriteArrayList<>(); List errors = new CopyOnWriteArrayList<>(); - List submitters = new ArrayList<>(maxConcurrency); + List submitters = new ArrayList<>(maxConcurrency + 1); - for (int i = 0; i < maxConcurrency; i++) { + for (int i = 0; i < maxConcurrency + 1; i++) { final String indexName = "simultaneous-" + i; Thread submitter = new Thread(() -> { try { @@ -166,4 +238,41 @@ public void testSimultaneousSubmissionsFromMultipleThreads() throws Exception { } } } + + /** + * Minimal test double implementing {@link DlmFrozenTransitionRunnable} with deterministic, test-controlled behavior. + * The {@code started} latch always counts down when the task begins. Set {@code blockUntil} to a non-released latch + * to hold the task, or leave it at the default (already released) for tasks that complete immediately. + */ + static class TestDlmFrozenTransitionRunnable implements DlmFrozenTransitionRunnable { + private final String indexName; + CountDownLatch started = new CountDownLatch(1); + CountDownLatch blockUntil = new CountDownLatch(0); + Throwable throwOnRun; + + TestDlmFrozenTransitionRunnable(String indexName) { + this.indexName = indexName; + } + + @Override + public String getIndexName() { + return indexName; + } + + @Override + public void run() { + started.countDown(); + try { + blockUntil.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (throwOnRun instanceof RuntimeException rte) { + throw rte; + } else if (throwOnRun instanceof Error error) { + throw error; + } + } + } } diff --git a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java index c66eb4f42a3d0..c5c4090fe6e43 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java +++ b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java @@ -83,12 +83,17 @@ public void setupTest() { Set> settingSet = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); settingSet.add(DlmFrozenTransitionService.POLL_INTERVAL_SETTING); settingSet.add(DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING); + settingSet.add(DlmFrozenTransitionService.MAX_QUEUE_SIZE); threadPool = new TestThreadPool(getTestName()); + // Set max_queue_size equal to max_concurrency so that capacity tests remain valid: once maxConcurrency * 2 + // tasks have been submitted, hasCapacity() returns false. + int maxConcurrency = DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING.getDefault(Settings.EMPTY); + Settings settings = Settings.builder().put("dlm.frozen_transition.max_queue_size", maxConcurrency).build(); clusterService = createClusterService( threadPool, DiscoveryNodeUtils.create("node", "node"), - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, settingSet) + settings, + new ClusterSettings(settings, settingSet) ); } @@ -274,18 +279,23 @@ public void testAlreadyRunningIndexIsNotSubmittedAgain() throws Exception { } public void testCheckForFrozenIndicesReturnsEarlyWhenCapacityExhausted() throws Exception { - int maxConcurrency = DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING.getDefault(Settings.EMPTY); + int maxConcurrency = DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING.get(clusterService.getSettings()); + int maxQueue = DlmFrozenTransitionService.MAX_QUEUE_SIZE.get(clusterService.getSettings()); + + int maxJobs = maxConcurrency + maxQueue; + CountDownLatch blockUntil = new CountDownLatch(1); - CountDownLatch tasksStarted = new CountDownLatch(maxConcurrency); + CountDownLatch allSubmitted = new CountDownLatch(maxJobs); List submittedIndices = new CopyOnWriteArrayList<>(); var service = new DlmFrozenTransitionService(clusterService, (indexName, pid) -> { submittedIndices.add(indexName); - return new TestDlmFrozenTransitionRunnable(indexName, blockUntil, tasksStarted); + allSubmitted.countDown(); + return new TestDlmFrozenTransitionRunnable(indexName, blockUntil); }); try { - // Start with exactly maxConcurrency marked indices so the initial poll fills capacity without rejection + // Start with exactly maxJobs marked indices so the initial poll fills capacity without rejection ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); - for (int i = 1; i <= maxConcurrency; i++) { + for (int i = 1; i <= maxJobs; i++) { String dsName = "frozen-ds-" + i; addDataStream(projectBuilder, dsName, createMarkedIndex(dsName)); } @@ -293,9 +303,9 @@ public void testCheckForFrozenIndicesReturnsEarlyWhenCapacityExhausted() throws setProjectState(projectBuilder); service.clusterChanged(createMasterEvent(true)); - // Wait for all tasks to start (capacity now exhausted) - safeAwait(tasksStarted); - assertEquals(maxConcurrency, submittedIndices.size()); + // Wait until all maxJobs tasks have been accepted by the executor (capacity now exhausted) + safeAwait(allSubmitted); + assertEquals(maxJobs, submittedIndices.size()); // Add more marked indices to the cluster state ProjectMetadata existingProject = clusterService.state().metadata().projects().values().iterator().next(); @@ -306,21 +316,72 @@ public void testCheckForFrozenIndicesReturnsEarlyWhenCapacityExhausted() throws // Manually trigger poll — running indices are skipped, new index hits capacity check and returns early service.checkForFrozenIndices(); - assertEquals("No additional indices should be submitted when capacity is exhausted", maxConcurrency, submittedIndices.size()); + assertEquals("No additional indices should be submitted when capacity is exhausted", maxJobs, submittedIndices.size()); } finally { blockUntil.countDown(); service.close(); } } - public void testPollIntervalMinimum() { - Settings tooLow = Settings.builder().put("dlm.frozen_transition.poll_interval", "30s").build(); - expectThrows(IllegalArgumentException.class, () -> DlmFrozenTransitionService.POLL_INTERVAL_SETTING.get(tooLow)); - } + /** + * A task that has been submitted but is waiting in the executor queue (single thread occupied, second index + * queued but not yet started) must be treated as "running" by the de-duplication guard in + * {@code checkForFrozenIndices}, so a second poll does not submit it a second time. + */ + public void testAlreadyQueuedIndexIsNotResubmitted() throws Exception { + Set> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + allSettings.add(DlmFrozenTransitionService.POLL_INTERVAL_SETTING); + allSettings.add(DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING); + allSettings.add(DlmFrozenTransitionService.MAX_QUEUE_SIZE); + Settings singleThreadSettings = Settings.builder() + .put("dlm.frozen_transition.max_concurrency", 1) + .put("dlm.frozen_transition.max_queue_size", 5) + .build(); + ClusterService localClusterService = createClusterService( + threadPool, + DiscoveryNodeUtils.create("local-node", "local-node"), + singleThreadSettings, + new ClusterSettings(singleThreadSettings, allSettings) + ); + try { + CountDownLatch blockUntil = new CountDownLatch(1); + List submittedIndices = new CopyOnWriteArrayList<>(); - public void testMaxConcurrencyMinimum() { - Settings tooLow = Settings.builder().put("dlm.frozen_transition.max_concurrency", 0).build(); - expectThrows(IllegalArgumentException.class, () -> DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING.get(tooLow)); + IndexMetadata firstIndex = createMarkedIndex("first-ds"); + IndexMetadata secondIndex = createMarkedIndex("second-ds"); + ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); + addDataStream(projectBuilder, "first-ds", firstIndex); + addDataStream(projectBuilder, "second-ds", secondIndex); + setState(localClusterService, ClusterState.builder(localClusterService.state()).putProjectMetadata(projectBuilder).build()); + + var service = new DlmFrozenTransitionService(localClusterService, (indexName, pid) -> { + submittedIndices.add(indexName); + return new TestDlmFrozenTransitionRunnable(indexName, blockUntil); + }); + try { + service.clusterChanged(createMasterEventFor(localClusterService, true)); + + // Wait until both indices have been accepted by the executor — one is running on the single + // thread, the other is waiting in the queue. We poll submittedTransitions directly so that + // the assertion is independent of which index happened to be scheduled first. + assertBusy(() -> { + DlmFrozenTransitionExecutor exec = service.getTransitionExecutor(); + assertNotNull(exec); + assertTrue(exec.transitionSubmitted(firstIndex.getIndex().getName())); + assertTrue(exec.transitionSubmitted(secondIndex.getIndex().getName())); + }); + assertEquals(2, submittedIndices.size()); + + // A second poll must skip both: one is running, the other is queued but not yet started. + service.checkForFrozenIndices(); + assertEquals("Queued index must not be submitted a second time", 2, submittedIndices.size()); + } finally { + blockUntil.countDown(); + service.close(); + } + } finally { + localClusterService.close(); + } } private IndexMetadata createMarkedIndex(String dataStreamName) { @@ -353,6 +414,10 @@ private void setProjectState(ProjectMetadata.Builder projectBuilder) { } private ClusterChangedEvent createMasterEvent(boolean isMaster) { + return createMasterEventFor(clusterService, isMaster); + } + + private ClusterChangedEvent createMasterEventFor(ClusterService cs, boolean isMaster) { var localNode = DiscoveryNodeUtils.create("local-node", "local-node"); var otherNode = DiscoveryNodeUtils.create("other-node", "other-node"); @@ -364,10 +429,7 @@ private ClusterChangedEvent createMasterEvent(boolean isMaster) { nodesBuilder.masterNodeId("other-node"); } - ClusterState newState = ClusterState.builder(clusterService.state()) - .nodes(nodesBuilder) - .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) - .build(); + ClusterState newState = ClusterState.builder(cs.state()).nodes(nodesBuilder).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); ClusterState previousState = ClusterState.builder(new ClusterName("test")) .nodes(