From c231fcff3d0324d78f48f08c580d77287bbb1fa1 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 7 Jan 2026 19:55:14 +1100 Subject: [PATCH 01/15] logging hot threads on large queue of the management threadpool Resolves: ES-13904 --- .../concurrent/ThreadPoolHotThreadsIT.java | 136 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 4 + .../common/util/concurrent/EsExecutors.java | 32 +++-- .../util/concurrent/EsThreadPoolExecutor.java | 71 ++++++++- ...utionTimeTrackingEsThreadPoolExecutor.java | 17 ++- .../DefaultBuiltInExecutorBuilders.java | 13 +- .../threadpool/ScalingExecutorBuilder.java | 72 +++++++++- .../util/concurrent/EsExecutorsTests.java | 12 +- ...TimeTrackingEsThreadPoolExecutorTests.java | 18 ++- 9 files changed, 348 insertions(+), 27 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java new file mode 100644 index 0000000000000..ae385a9471781 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.MockLog.assertThatLogger; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class ThreadPoolHotThreadsIT extends ESIntegTestCase { + + public void testNotHotThreadsForPersistedManagementThreadPoolQueueSizeWhenDisabled() { + final Settings.Builder settingsBuilder = Settings.builder() + .put(HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.getKey(), TimeValue.timeValueSeconds(1)); + if (randomBoolean()) { + settingsBuilder.put(HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.getKey(), 0); + } + doTestHotThreadsForPersistedManagementThreadPoolQueueSize(settingsBuilder.build()); + } + + public void testHotThreadsForPersistedManagementThreadPoolQueueSizeWhenEnabled() { + doTestHotThreadsForPersistedManagementThreadPoolQueueSize( + Settings.builder() + .put(HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.getKey(), between(8, 16)) + .put(HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .build() + ); + } + + private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings nodeSettings) { + final int sizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(nodeSettings); + final TimeValue durationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(nodeSettings); + final boolean enabled = sizeThreshold > 0; + + final String node = internalCluster().startNode(nodeSettings); + final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node); + final var managementExecutor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.MANAGEMENT); + final int maxManagementThreads = threadPool.info(ThreadPool.Names.MANAGEMENT).getMax(); + + final var executingLatch = new CountDownLatch(maxManagementThreads); + final var continueLatch = new CountDownLatch(1); + + final Runnable blockingTask = () -> { + executingLatch.countDown(); + safeAwait(continueLatch); + }; + + // Activate all available threads + for (int i = 0; i < maxManagementThreads; i++) { + if (randomBoolean()) { + managementExecutor.execute(blockingTask); + } else { + managementExecutor.submit(blockingTask); + } + } + + safeAwait(executingLatch); + assertThat(managementExecutor.getActiveCount(), equalTo(maxManagementThreads)); + logger.info("--> all blocking tasks are running"); + + // Fill the queue up to the threshold + for (int i = 0; i < sizeThreshold; i++) { + if (randomBoolean()) { + managementExecutor.execute(() -> {}); + } else { + managementExecutor.submit(() -> {}); + } + } + assertThat(managementExecutor.getQueue().size(), equalTo(sizeThreshold)); + waitForTimeElapse(durationThreshold); + + try { + if (enabled) { + assertThatLogger(() -> { + if (randomBoolean()) { + managementExecutor.execute(() -> {}); + } else { + managementExecutor.submit(() -> {}); + } + }, + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [*/management] queue size [" + sizeThreshold + "] has been over threshold for *" + ) + ); + } + + assertThatLogger(() -> { + if (randomBoolean()) { + managementExecutor.execute(() -> {}); + } else { + managementExecutor.submit(() -> {}); + } + }, + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation( + "should not log again since it's frequency capped", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "*" + ) + ); + } finally { + continueLatch.countDown(); + } + } + + private static void waitForTimeElapse(TimeValue duration) { + final var startTime = System.currentTimeMillis(); + // Ensure we wait for at least the specified duration by loop and explicit check the elapsed time to avoid + // potential inconsistency between slept time and System.currentTimeMillis() difference. + while (System.currentTimeMillis() - startTime < duration.millis()) { + safeSleep(duration); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 15c2ae238f00d..91cdafabf8c99 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -138,6 +138,7 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterPortSettings; import org.elasticsearch.transport.RemoteClusterSettings; @@ -561,6 +562,9 @@ public void apply(Settings value, Settings current, Settings previous) { ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING, + ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING, + ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING, + ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 5ed4136edd49a..8073ab01fb2a0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import java.util.List; @@ -119,12 +120,14 @@ public static EsThreadPoolExecutor newScaling( boolean rejectAfterShutdown, ThreadFactory threadFactory, ThreadContext contextHolder, - TaskTrackingConfig config + TaskTrackingConfig config, + HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty), // probing the worker pool prevents this. boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue; + final ForceQueuePolicy queuePolicy = new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool); if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, @@ -135,9 +138,10 @@ public static EsThreadPoolExecutor newScaling( queue, TimedRunnable::new, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), + queuePolicy, contextHolder, - config + config, + hotThreadsOnLargeQueueConfig ); } else { return new EsThreadPoolExecutor( @@ -148,8 +152,9 @@ public static EsThreadPoolExecutor newScaling( unit, queue, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), - contextHolder + queuePolicy, + contextHolder, + hotThreadsOnLargeQueueConfig ); } } @@ -188,7 +193,8 @@ public static EsThreadPoolExecutor newScaling( rejectAfterShutdown, threadFactory, contextHolder, - TaskTrackingConfig.DO_NOT_TRACK + TaskTrackingConfig.DO_NOT_TRACK, + HotThreadsOnLargeQueueConfig.DISABLED ); } @@ -221,7 +227,8 @@ public static EsThreadPoolExecutor newFixed( threadFactory, rejectedExecutionHandler, contextHolder, - config + config, + HotThreadsOnLargeQueueConfig.DISABLED ); } else { return new EsThreadPoolExecutor( @@ -233,7 +240,8 @@ public static EsThreadPoolExecutor newFixed( queue, threadFactory, rejectedExecutionHandler, - contextHolder + contextHolder, + HotThreadsOnLargeQueueConfig.DISABLED ); } } @@ -654,4 +662,12 @@ public TaskTrackingConfig build() { } } + public record HotThreadsOnLargeQueueConfig(int sizeThreshold, long durationThresholdInMillis, TimeValue interval) { + + public static final HotThreadsOnLargeQueueConfig DISABLED = new HotThreadsOnLargeQueueConfig(0, -1, TimeValue.MINUS_ONE); + + public boolean isEnabled() { + return sizeThreshold > 0; + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index ad4616692850e..33d9d3bccf6c0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -9,9 +9,14 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.FrequencyCappedAction; +import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.monitor.jvm.HotThreads; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -45,6 +50,17 @@ public void run() {} */ private final String name; + private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig; + + // There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise + // and can tolerate some inaccuracies. + private volatile long startTimeOfLargeQueue = -1L; + + private final FrequencyCappedAction hotThreadsLogger = new FrequencyCappedAction( + System::currentTimeMillis, + TimeValue.ZERO + ); + EsThreadPoolExecutor( String name, int corePoolSize, @@ -55,7 +71,18 @@ public void run() {} ThreadFactory threadFactory, ThreadContext contextHolder ) { - this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); + this( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + new EsAbortPolicy(), + contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED + ); } @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") @@ -68,11 +95,14 @@ public void run() {} BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, - ThreadContext contextHolder + ThreadContext contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; + this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; + this.hotThreadsLogger.setMinInterval(hotThreadsOnLargeQueueConfig.interval()); } @Override @@ -87,7 +117,42 @@ public void setMaximumPoolSize(int maximumPoolSize) { @Override public void execute(Runnable command) { - final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE; + final boolean isNotProbe = command != WORKER_PROBE; + final Runnable wrappedRunnable = isNotProbe ? wrapRunnable(command) : WORKER_PROBE; + + if (isNotProbe && hotThreadsOnLargeQueueConfig.isEnabled()) { + int queueSize = getQueue().size(); + // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, + // thus reaching the threshold of 500. It won't log right away due to the duration threshold. + if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { + final long startTime = startTimeOfLargeQueue; + final long now = System.currentTimeMillis(); + if (startTime == -1) { + startTimeOfLargeQueue = now; + } else { + final long duration = now - startTime; + if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) { + hotThreadsLogger.maybeExecute(() -> { + HotThreads.logLocalHotThreads( + logger, + Level.INFO, + "ThreadPoolExecutor [" + + name + + "] queue size [" + + queueSize + + "] has been over threshold for [" + + TimeValue.timeValueMillis(duration) + + "]", + ReferenceDocs.LOGGING + ); + }); + } + } + } else { + startTimeOfLargeQueue = -1L; + } + } + try { super.execute(wrappedRunnable); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f66b6384211b1..4ce1e05a597cd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.metrics.ExponentialBucketHistogram; +import org.elasticsearch.common.util.concurrent.EsExecutors.HotThreadsOnLargeQueueConfig; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.core.TimeValue; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; @@ -72,9 +73,21 @@ public enum UtilizationTrackingPurpose { ThreadFactory threadFactory, RejectedExecutionHandler handler, ThreadContext contextHolder, - TaskTrackingConfig trackingConfig + TaskTrackingConfig trackingConfig, + HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { - super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); + super( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + handler, + contextHolder, + hotThreadsOnLargeQueueConfig + ); this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 23728fe68bb0f..6d20baae31d92 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -20,6 +20,9 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; @@ -32,6 +35,9 @@ public Map getBuilders(Settings settings, int allocated final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512); final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings); + final int hotThreadsOnLargeQueueSizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(settings); + final TimeValue hotThreadsOnLargeQueueDurationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(settings); + final TimeValue hotThreadsOnLargeQueueInterval = HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING.get(settings); Map result = new HashMap<>(); result.put( @@ -115,7 +121,12 @@ public Map getBuilders(Settings settings, int allocated 1, ThreadPool.boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5), - false + false, + new EsExecutors.HotThreadsOnLargeQueueConfig( + hotThreadsOnLargeQueueSizeThreshold, + hotThreadsOnLargeQueueDurationThreshold.millis(), + hotThreadsOnLargeQueueInterval + ) ) ); result.put( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index 0fb2f1e471d0b..4d62027fd4edc 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -32,11 +32,32 @@ */ public final class ScalingExecutorBuilder extends ExecutorBuilder { + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING = Setting.intSetting( + "thread_pools.hot_threads_on_large_queue.size_threshold", + 0, // default 0 means disabled + 0, + Setting.Property.NodeScope + ); + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING = Setting.timeSetting( + "thread_pools.hot_threads_on_large_queue.duration_threshold", + TimeValue.timeValueMinutes(2), + TimeValue.timeValueSeconds(1), + Setting.Property.NodeScope + ); + + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING = Setting.timeSetting( + "thread_pools.hot_threads_on_large_queue.interval", + TimeValue.timeValueMinutes(60), + TimeValue.timeValueMinutes(10), + Setting.Property.NodeScope + ); + private final Setting coreSetting; private final Setting maxSetting; private final Setting keepAliveSetting; private final boolean rejectAfterShutdown; private final EsExecutors.TaskTrackingConfig trackingConfig; + private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig; /** * Construct a scaling executor builder; the settings will have the @@ -59,6 +80,38 @@ public ScalingExecutorBuilder( this(name, core, max, keepAlive, rejectAfterShutdown, "thread_pool." + name); } + /** + * Construct a scaling executor builder; the settings will have the + * key prefix "thread_pool." followed by the executor name. + * + * @param name the name of the executor + * @param core the minimum number of threads in the pool + * @param max the maximum number of threads in the pool + * @param keepAlive the time that spare threads above {@code core} + * threads will be kept alive + * @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown + * @param hotThreadsOnLargeQueueConfig configuration for indicating whether hot threads should be logged on large queue size + */ + public ScalingExecutorBuilder( + final String name, + final int core, + final int max, + final TimeValue keepAlive, + final boolean rejectAfterShutdown, + final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig + ) { + this( + name, + core, + max, + keepAlive, + rejectAfterShutdown, + "thread_pool." + name, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK, + hotThreadsOnLargeQueueConfig + ); + } + /** * Construct a scaling executor builder; the settings will have the * specified key prefix. @@ -79,7 +132,16 @@ public ScalingExecutorBuilder( final boolean rejectAfterShutdown, final String prefix ) { - this(name, core, max, keepAlive, rejectAfterShutdown, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK); + this( + name, + core, + max, + keepAlive, + rejectAfterShutdown, + prefix, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED + ); } /** @@ -94,6 +156,7 @@ public ScalingExecutorBuilder( * @param prefix the prefix for the settings keys * @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown * @param trackingConfig configuration that'll indicate if we should track statistics about task execution time + * @param hotThreadsOnLargeQueueConfig configuration for indicating whether hot threads should be logged on large queue size */ public ScalingExecutorBuilder( final String name, @@ -102,7 +165,8 @@ public ScalingExecutorBuilder( final TimeValue keepAlive, final boolean rejectAfterShutdown, final String prefix, - final EsExecutors.TaskTrackingConfig trackingConfig + final EsExecutors.TaskTrackingConfig trackingConfig, + final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { super(name, false); this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope); @@ -115,6 +179,7 @@ public ScalingExecutorBuilder( ); this.rejectAfterShutdown = rejectAfterShutdown; this.trackingConfig = trackingConfig; + this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; } @Override @@ -147,7 +212,8 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th rejectAfterShutdown, threadFactory, threadContext, - trackingConfig + trackingConfig, + hotThreadsOnLargeQueueConfig ); return new ThreadPool.ExecutorHolder(executor, info); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index dd7274a5e0186..20be93291c5f1 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -690,7 +690,8 @@ public void testScalingWithTaskTimeTracking() { threadContext, randomBoolean() ? EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } @@ -719,7 +720,8 @@ public void testScalingWithTaskTimeTracking() { randomBoolean(), TestEsExecutors.testOnlyDaemonThreadFactory("test"), threadContext, - DO_NOT_TRACK + DO_NOT_TRACK, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); assertThat(pool, instanceOf(EsThreadPoolExecutor.class)); } @@ -840,7 +842,8 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() { new EsExecutors.ExecutorScalingQueue<>(), TestEsExecutors.testOnlyDaemonThreadFactory(getTestName()), new EsExecutors.ForceQueuePolicy(true, true), - threadContext + threadContext, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ) ); } @@ -857,7 +860,8 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() { new EsExecutors.ExecutorScalingQueue<>(), TestEsExecutors.testOnlyDaemonThreadFactory(getTestName()), new EsExecutors.ForceQueuePolicy(true, true), - threadContext + threadContext, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ) ); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 0c379dc26321f..558b143678ce4 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -57,7 +57,8 @@ public void testExecutionEWMACalculation() throws Exception { .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) .build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -129,7 +130,8 @@ public void testFrontOfQueueLatency() throws Exception { : EsExecutors.TaskTrackingConfig.builder() .trackMaxQueueLatency() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); try { executor.prestartAllCoreThreads(); @@ -196,7 +198,8 @@ public void testMaxDequeuedQueueLatency() throws Exception { : EsExecutors.TaskTrackingConfig.builder() .trackMaxQueueLatency() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); try { executor.prestartAllCoreThreads(); @@ -242,7 +245,8 @@ public void testExceptionThrowingTask() throws Exception { .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) .build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -276,7 +280,8 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.TaskTrackingConfig.builder() .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -313,7 +318,8 @@ public void testQueueLatencyHistogramMetrics() { EsExecutors.TaskTrackingConfig.builder() .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.setupMetrics(meterRegistry, threadPoolName); From 8088379e34cc9a4ac9052e16d2f61a7f4af8bb2b Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 7 Jan 2026 09:27:10 +0000 Subject: [PATCH 02/15] [CI] Auto commit changes from spotless --- .../common/util/concurrent/EsThreadPoolExecutor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 33d9d3bccf6c0..fff11ff6a043d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -56,10 +56,7 @@ public void run() {} // and can tolerate some inaccuracies. private volatile long startTimeOfLargeQueue = -1L; - private final FrequencyCappedAction hotThreadsLogger = new FrequencyCappedAction( - System::currentTimeMillis, - TimeValue.ZERO - ); + private final FrequencyCappedAction hotThreadsLogger = new FrequencyCappedAction(System::currentTimeMillis, TimeValue.ZERO); EsThreadPoolExecutor( String name, From d2c8b1565eb85efab1e4927931efba366a0b19ba Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 6 Jan 2026 23:11:59 -0800 Subject: [PATCH 03/15] Start exchange sink fetchers concurrently (#140196) We should start fetching pages from the exchange sink asynchronously for each client. However, the current code should not be an issue, as we go asynchronous when sending messages in production. Therefore, this is marked as a non-issue rather than a bug. --- .../exchange/ExchangeSourceHandler.java | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index 4d8b7e5ad1035..d980c48e33afb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -8,8 +8,8 @@ package org.elasticsearch.compute.operator.exchange; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.SubscribableListener; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.data.Page; @@ -257,32 +257,26 @@ public void addRemoteSink( final ActionListener sinkListener = ActionListener.assertAtLeastOnce( ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId))) ); - final Releasable emptySink = addEmptySink(); - fetchExecutor.execute(new AbstractRunnable() { - @Override - public void onAfter() { - emptySink.close(); - } - - @Override - public void onFailure(Exception e) { - if (failFast) { - aborted = true; - } - buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading - remoteSink.close(ActionListener.running(() -> sinkListener.onFailure(e))); - } - - @Override - protected void doRun() { - try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) { - for (int i = 0; i < instances; i++) { - var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, refs.acquire()); + try (var refs = new EsqlRefCountingListener(ActionListener.releaseBefore(addEmptySink(), sinkListener))) { + for (int i = 0; i < instances; i++) { + fetchExecutor.execute(new ActionRunnable<>(refs.acquire()) { + @Override + protected void doRun() { + var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, this.listener); fetcher.fetchPage(); } - } + + @Override + public void onFailure(Exception e) { + if (failFast) { + aborted = true; + } + buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading + remoteSink.close(this.listener); + } + }); } - }); + } } /** From a920ab504bd74c0d8ca5dcf84e90ebb5389ca930 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Wed, 7 Jan 2026 09:00:59 +0100 Subject: [PATCH 04/15] Mute org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT testCreatesEisChatCompletion_DoesNotRemoveEndpointWhenNoLongerAuthorized #138480 --- muted-tests.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index b2ae38deddc6e..3ca20480ece11 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -366,6 +366,9 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test072RunEsAsDifferentUserAndGroup issue: https://github.com/elastic/elasticsearch/issues/140127 +- class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT + method: testCreatesEisChatCompletion_DoesNotRemoveEndpointWhenNoLongerAuthorized + issue: https://github.com/elastic/elasticsearch/issues/138480 # Examples: # From 0c5c6c2e7ab12b9d9cbccd6f905c9c4a51113955 Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Wed, 7 Jan 2026 09:10:00 +0000 Subject: [PATCH 05/15] Allow a slight difference in rescored docs (#139931) Follow on from #139769 to update some more tests for FP differences --- muted-tests.yml | 15 ---- .../search/query/RescoreKnnVectorQueryIT.java | 23 +++--- .../vectors/RescoreKnnVectorQueryTests.java | 79 ++++++++++++------- 3 files changed, 65 insertions(+), 52 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 3ca20480ece11..a162f106237a5 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -333,33 +333,18 @@ tests: - class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT method: test {yaml=search.retrievers/result-diversification/10_mmr_result_diversification_retriever/Test MMR result diversification single index float type} issue: https://github.com/elastic/elasticsearch/issues/139848 -- class: org.elasticsearch.search.vectors.RescoreKnnVectorQueryTests - method: testRescoreDocs - issue: https://github.com/elastic/elasticsearch/issues/139859 -- class: org.elasticsearch.search.vectors.RescoreKnnVectorQueryTests - method: testRescoreSingleAndBulkEquality - issue: https://github.com/elastic/elasticsearch/issues/139869 -- class: org.elasticsearch.search.query.RescoreKnnVectorQueryIT - method: testKnnQueryRescore - issue: https://github.com/elastic/elasticsearch/issues/139912 - class: org.elasticsearch.snapshots.SnapshotStressTestsIT method: testRandomActivities issue: https://github.com/elastic/elasticsearch/issues/139974 - class: org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobContainerRetriesTests method: testReadBlobWithReadTimeouts issue: https://github.com/elastic/elasticsearch/issues/139995 -- class: org.elasticsearch.search.query.RescoreKnnVectorQueryIT - method: testKnnRetriever - issue: https://github.com/elastic/elasticsearch/issues/140014 - class: org.elasticsearch.packaging.test.DockerTests method: test070BindMountCustomPathConfAndJvmOptions issue: https://github.com/elastic/elasticsearch/issues/131366 - class: org.elasticsearch.packaging.test.DockerTests method: test140CgroupOsStatsAreAvailable issue: https://github.com/elastic/elasticsearch/issues/131372 -- class: org.elasticsearch.search.query.RescoreKnnVectorQueryIT - method: testKnnSearchRescore - issue: https://github.com/elastic/elasticsearch/issues/140078 - class: org.elasticsearch.packaging.test.DockerTests method: test130JavaHasCorrectOwnership issue: https://github.com/elastic/elasticsearch/issues/131369 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/query/RescoreKnnVectorQueryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/query/RescoreKnnVectorQueryIT.java index 00ad202af78ec..a91ebf8d45baa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/query/RescoreKnnVectorQueryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/query/RescoreKnnVectorQueryIT.java @@ -38,7 +38,6 @@ import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -47,11 +46,10 @@ import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.closeTo; public class RescoreKnnVectorQueryIT extends ESIntegTestCase { @@ -60,6 +58,12 @@ public class RescoreKnnVectorQueryIT extends ESIntegTestCase { public static final String VECTOR_SCORE_SCRIPT = "vector_scoring"; public static final String QUERY_VECTOR_PARAM = "query_vector"; + /* + * Original KNN scoring and rescoring can use slightly different calculation methods, + * so there may be a very slight difference in the scores after rescoring. + */ + private static final float DELTA = 1e-6f; + @Override protected Collection> nodePlugins() { return Collections.singleton(CustomScriptPlugin.class); @@ -87,7 +91,7 @@ public void setup() throws IOException { Arrays.stream(VectorIndexType.values()) .filter(VectorIndexType::isQuantized) .map(t -> t.name().toLowerCase(Locale.ROOT)) - .collect(Collectors.toCollection(ArrayList::new)) + .toList() ); XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() @@ -196,15 +200,12 @@ private void testKnnRescore(BiFunction { compareWithExactSearch(knnResponse, queryVector, numDocs); }); + assertNoFailuresAndResponse(requestBuilder, knnResponse -> compareWithExactSearch(knnResponse, queryVector, numDocs)); } private static void compareWithExactSearch(SearchResponse knnResponse, float[] queryVector, int docCount) { @@ -228,7 +229,11 @@ private static void compareWithExactSearch(SearchResponse knnResponse, float[] q if (i >= exactHits.length) { fail("Knn doc not found in exact search"); } - assertThat("Real score is not the same as rescored score", knnHit.getScore(), equalTo(exactHits[i].getScore())); + assertThat( + "Real score is not the same as rescored score", + (double) knnHit.getScore(), + closeTo(exactHits[i].getScore(), DELTA) + ); } }); } diff --git a/server/src/test/java/org/elasticsearch/search/vectors/RescoreKnnVectorQueryTests.java b/server/src/test/java/org/elasticsearch/search/vectors/RescoreKnnVectorQueryTests.java index d418595ac8611..e732383919743 100644 --- a/server/src/test/java/org/elasticsearch/search/vectors/RescoreKnnVectorQueryTests.java +++ b/server/src/test/java/org/elasticsearch/search/vectors/RescoreKnnVectorQueryTests.java @@ -53,10 +53,15 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.index.codec.vectors.diskbbq.ES920DiskBBQVectorsFormat.DEFAULT_CENTROIDS_PER_PARENT_CLUSTER; import static org.elasticsearch.index.codec.vectors.diskbbq.ES920DiskBBQVectorsFormat.DEFAULT_VECTORS_PER_CLUSTER; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -64,12 +69,18 @@ public class RescoreKnnVectorQueryTests extends ESTestCase { public static final String FIELD_NAME = "float_vector"; + /* + * Original KNN scoring and rescoring can use slightly different calculation methods, + * so there may be a very slight difference in the scores after rescoring. + */ + private static final float DELTA = 1e-6f; + public void testRescoreDocs() throws Exception { int numDocs = randomIntBetween(10, 100); int numDims = randomIntBetween(5, 100); int k = randomIntBetween(1, numDocs - 1); - var queryVector = randomVector(numDims); + float[] queryVector = randomVector(numDims); List innerQueries = new ArrayList<>(); innerQueries.add(new KnnFloatVectorQuery(FIELD_NAME, randomVector(numDims), (int) (k * randomFloatBetween(1.0f, 10.0f, true)))); innerQueries.add( @@ -83,8 +94,7 @@ public void testRescoreDocs() throws Exception { addRandomDocuments(numDocs, d, numDims); try (IndexReader reader = DirectoryReader.open(d)) { - - for (var innerQuery : innerQueries) { + for (Query innerQuery : innerQueries) { RescoreKnnVectorQuery rescoreKnnVectorQuery = RescoreKnnVectorQuery.fromInnerQuery( FIELD_NAME, queryVector, @@ -95,8 +105,26 @@ public void testRescoreDocs() throws Exception { ); IndexSearcher searcher = newSearcher(reader, true, false); + TopDocs rescoredDocs = searcher.search(rescoreKnnVectorQuery, numDocs); - assertThat(rescoredDocs.scoreDocs.length, equalTo(k)); + assertThat(rescoredDocs.scoreDocs, arrayWithSize(k)); + + if (innerQuery instanceof KnnFloatVectorQuery) { + // check that at least one doc has its score changed, indicating rescoring has happened + TopDocs unrescoredDocs = searcher.search(innerQuery, numDocs); + Map rescoredScores = Arrays.stream(rescoredDocs.scoreDocs) + .collect(Collectors.toMap(sd -> sd.doc, sd -> (double) sd.score)); + + boolean changed = false; + for (ScoreDoc unrescored : unrescoredDocs.scoreDocs) { + Double rescored = rescoredScores.get(unrescored.doc); + if (rescored != null && Math.abs(rescored - unrescored.score) > DELTA) { + changed = true; + break; + } + } + assertTrue("No docs had their scores changed", changed); + } // Get real scores DoubleValuesSource valueSource = new FullPrecisionFloatVectorSimilarityValuesSource( @@ -118,7 +146,11 @@ public void testRescoreDocs() throws Exception { if (i >= realScoreDocs.length) { fail("Rescored doc not found in real score docs"); } - assertThat("Real score is not the same as rescored score", rescoreDoc.score, equalTo(realScoreDocs[i].score)); + assertThat( + "Real score is not the same as rescored score", + (double) rescoreDoc.score, + closeTo(realScoreDocs[i].score, DELTA) + ); } } } @@ -130,7 +162,7 @@ public void testRescoreSingleAndBulkEquality() throws Exception { int numDims = randomIntBetween(5, 100); int k = randomIntBetween(1, numDocs - 1); - var queryVector = randomVector(numDims); + float[] queryVector = randomVector(numDims); List innerQueries = new ArrayList<>(); innerQueries.add(new KnnFloatVectorQuery(FIELD_NAME, randomVector(numDims), (int) (k * randomFloatBetween(1.0f, 10.0f, true)))); @@ -144,7 +176,7 @@ public void testRescoreSingleAndBulkEquality() throws Exception { try (Directory d = newDirectory()) { addRandomDocuments(numDocs, d, numDims); try (DirectoryReader reader = DirectoryReader.open(d)) { - for (var innerQuery : innerQueries) { + for (Query innerQuery : innerQueries) { RescoreKnnVectorQuery rescoreKnnVectorQuery = RescoreKnnVectorQuery.fromInnerQuery( FIELD_NAME, queryVector, @@ -156,7 +188,7 @@ public void testRescoreSingleAndBulkEquality() throws Exception { IndexSearcher searcher = newSearcher(reader, true, false); TopDocs rescoredDocs = searcher.search(rescoreKnnVectorQuery, numDocs); - assertThat(rescoredDocs.scoreDocs.length, equalTo(k)); + assertThat(rescoredDocs.scoreDocs, arrayWithSize(k)); searcher = newSearcher(new SingleVectorQueryIndexReader(reader), true, false); rescoreKnnVectorQuery = RescoreKnnVectorQuery.fromInnerQuery( @@ -168,14 +200,14 @@ public void testRescoreSingleAndBulkEquality() throws Exception { innerQuery ); TopDocs singleRescored = searcher.search(rescoreKnnVectorQuery, numDocs); - assertThat(singleRescored.scoreDocs.length, equalTo(k)); + assertThat(singleRescored.scoreDocs, arrayWithSize(k)); // Get real scores ScoreDoc[] singleRescoreDocs = singleRescored.scoreDocs; int i = 0; for (ScoreDoc rescoreDoc : rescoredDocs.scoreDocs) { assertThat(rescoreDoc.doc, equalTo(singleRescoreDocs[i].doc)); - assertThat(rescoreDoc.score, equalTo(singleRescoreDocs[i].score)); + assertThat((double) rescoreDoc.score, closeTo(singleRescoreDocs[i].score, DELTA)); i++; } } @@ -282,28 +314,19 @@ public void profile(QueryProfiler queryProfiler) { private static void addRandomDocuments(int numDocs, Directory d, int numDims) throws IOException { IndexWriterConfig iwc = new IndexWriterConfig(); // Pick codec from quantized vector formats to ensure scores use real scores when using knn rescore + DenseVectorFieldMapper.ElementType elementType = randomFrom( + DenseVectorFieldMapper.ElementType.FLOAT, + DenseVectorFieldMapper.ElementType.BFLOAT16 + ); KnnVectorsFormat format = randomFrom( - new ES920DiskBBQVectorsFormat( - DEFAULT_VECTORS_PER_CLUSTER, - DEFAULT_CENTROIDS_PER_PARENT_CLUSTER, - randomFrom(DenseVectorFieldMapper.ElementType.FLOAT, DenseVectorFieldMapper.ElementType.BFLOAT16), - randomBoolean() - ), - new ES93BinaryQuantizedVectorsFormat( - randomFrom(DenseVectorFieldMapper.ElementType.FLOAT, DenseVectorFieldMapper.ElementType.BFLOAT16), - false - ), - new ES93HnswBinaryQuantizedVectorsFormat( - randomFrom(DenseVectorFieldMapper.ElementType.FLOAT, DenseVectorFieldMapper.ElementType.BFLOAT16), - randomBoolean() - ), - new ES93ScalarQuantizedVectorsFormat( - randomFrom(DenseVectorFieldMapper.ElementType.FLOAT, DenseVectorFieldMapper.ElementType.BFLOAT16) - ), + new ES920DiskBBQVectorsFormat(DEFAULT_VECTORS_PER_CLUSTER, DEFAULT_CENTROIDS_PER_PARENT_CLUSTER, elementType, randomBoolean()), + new ES93BinaryQuantizedVectorsFormat(elementType, false), + new ES93HnswBinaryQuantizedVectorsFormat(elementType, randomBoolean()), + new ES93ScalarQuantizedVectorsFormat(elementType), new ES93HnswScalarQuantizedVectorsFormat( DEFAULT_VECTORS_PER_CLUSTER, DEFAULT_CENTROIDS_PER_PARENT_CLUSTER, - randomFrom(DenseVectorFieldMapper.ElementType.FLOAT, DenseVectorFieldMapper.ElementType.BFLOAT16), + elementType, null, 7, false, From ace32251d2656271ad98ffecceca4d113c44a6af Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 7 Jan 2026 21:45:55 +1100 Subject: [PATCH 06/15] fix compilaation --- .../org/elasticsearch/xpack/stateless/StatelessPlugin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java b/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java index 0d7fedc71cfc1..d9c74a216be4b 100644 --- a/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java +++ b/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java @@ -175,7 +175,8 @@ public static ExecutorBuilder[] statelessExecutorBuilders(Settings settings, TimeValue.timeValueMinutes(5), true, SHARD_READ_THREAD_POOL_SETTING, - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build() + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ), new ScalingExecutorBuilder( TRANSLOG_THREAD_POOL, From 53730f298c14dced5c39a99860d4adbb53d51092 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 8 Jan 2026 14:23:05 +1100 Subject: [PATCH 07/15] tests --- .../concurrent/ThreadPoolHotThreadsIT.java | 173 +++++++++++++++--- .../util/concurrent/EsThreadPoolExecutor.java | 90 ++++++--- 2 files changed, 211 insertions(+), 52 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java index ae385a9471781..b9cdadf7d1d88 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java @@ -17,16 +17,21 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING; import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class ThreadPoolHotThreadsIT extends ESIntegTestCase { - public void testNotHotThreadsForPersistedManagementThreadPoolQueueSizeWhenDisabled() { + public void testNotHotThreadsForPersistedManagementThreadPoolQueueSizeWhenDisabled() throws Exception { final Settings.Builder settingsBuilder = Settings.builder() .put(HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.getKey(), TimeValue.timeValueSeconds(1)); if (randomBoolean()) { @@ -35,7 +40,7 @@ public void testNotHotThreadsForPersistedManagementThreadPoolQueueSizeWhenDisabl doTestHotThreadsForPersistedManagementThreadPoolQueueSize(settingsBuilder.build()); } - public void testHotThreadsForPersistedManagementThreadPoolQueueSizeWhenEnabled() { + public void testHotThreadsForPersistedManagementThreadPoolQueueSizeWhenEnabled() throws Exception { doTestHotThreadsForPersistedManagementThreadPoolQueueSize( Settings.builder() .put(HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.getKey(), between(8, 16)) @@ -44,7 +49,7 @@ public void testHotThreadsForPersistedManagementThreadPoolQueueSizeWhenEnabled() ); } - private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings nodeSettings) { + private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings nodeSettings) throws Exception { final int sizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(nodeSettings); final TimeValue durationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(nodeSettings); final boolean enabled = sizeThreshold > 0; @@ -64,11 +69,7 @@ private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings // Activate all available threads for (int i = 0; i < maxManagementThreads; i++) { - if (randomBoolean()) { - managementExecutor.execute(blockingTask); - } else { - managementExecutor.submit(blockingTask); - } + runTask(managementExecutor, blockingTask); } safeAwait(executingLatch); @@ -77,24 +78,16 @@ private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings // Fill the queue up to the threshold for (int i = 0; i < sizeThreshold; i++) { - if (randomBoolean()) { - managementExecutor.execute(() -> {}); - } else { - managementExecutor.submit(() -> {}); - } + runTask(managementExecutor, () -> {}); } assertThat(managementExecutor.getQueue().size(), equalTo(sizeThreshold)); + assertThat(managementExecutor.getStartTimeOfLargeQueue(), enabled ? greaterThan(0L) : equalTo(-1L)); waitForTimeElapse(durationThreshold); try { if (enabled) { - assertThatLogger(() -> { - if (randomBoolean()) { - managementExecutor.execute(() -> {}); - } else { - managementExecutor.submit(() -> {}); - } - }, + assertThatLogger( + () -> runTask(managementExecutor, () -> {}), EsThreadPoolExecutor.class, new MockLog.SeenEventExpectation( "should log hot threads", @@ -103,26 +96,152 @@ private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings "ThreadPoolExecutor [*/management] queue size [" + sizeThreshold + "] has been over threshold for *" ) ); + assertThat(managementExecutor.getStartTimeOfLargeQueue(), greaterThan(0L)); } + assertThatLogger( + () -> runTask(managementExecutor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + assertThat(managementExecutor.getStartTimeOfLargeQueue(), enabled ? greaterThan(0L) : equalTo(-1L)); + } finally { + continueLatch.countDown(); + } + + if (enabled) { + assertBusy(() -> assertThat(managementExecutor.getQueue().size(), lessThan(sizeThreshold - 1))); + // Run a final task and the tracking should be cleared since the large queue size is gone + runTask(managementExecutor, () -> {}); + assertThat(managementExecutor.getStartTimeOfLargeQueue(), equalTo(-1L)); + } + } + + public void testEsThreadPoolExecutor() throws Exception { + final long startTime = randomLongBetween(0, 1000); + final var timer = new AtomicLong(startTime); + final String nodeName = randomIdentifier(); + final String threadName = randomIdentifier(); + final int maxThreads = between(1, 5); + final var sizeThreshold = between(10, 100); + final TimeValue durationThreshold = randomTimeValue(1, 5, TimeUnit.MINUTES); + final TimeValue interval = randomTimeValue(30, 60, TimeUnit.MINUTES); + + final var executor = new EsThreadPoolExecutor( + threadName, + 1, + maxThreads, + 5, + TimeUnit.MINUTES, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(nodeName, threadName), + new EsExecutors.ForceQueuePolicy(false, false), + new ThreadContext(Settings.EMPTY), + new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval), + timer::get + ); + assertThat(executor.getStartTimeOfLargeQueue(), equalTo(-1L)); + + final var executingLatch = new CountDownLatch(maxThreads); + final var continueLatch = new CountDownLatch(1); + + final Runnable blockingTask = () -> { + executingLatch.countDown(); + safeAwait(continueLatch); + }; + + // Activate all available threads + for (int i = 0; i < maxThreads; i++) { + runTask(executor, blockingTask); + } + safeAwait(executingLatch); + assertThat(executor.getActiveCount(), equalTo(maxThreads)); + + try { + // 1. Fill the queue up to the threshold - No logging but tracking should start due to queue size reaching the threshold assertThatLogger(() -> { - if (randomBoolean()) { - managementExecutor.execute(() -> {}); - } else { - managementExecutor.submit(() -> {}); + for (int i = 0; i < sizeThreshold; i++) { + runTask(executor, () -> {}); } + assertThat(executor.getQueue().size(), equalTo(sizeThreshold)); + assertThat(executor.getStartTimeOfLargeQueue(), equalTo(timer.get())); }, EsThreadPoolExecutor.class, - new MockLog.UnseenEventExpectation( - "should not log again since it's frequency capped", + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 2. Queue more tasks but should not see logging yet since duration threshold not met + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 3. Advance time and we should observe logging when adding more task + final long elapsedMillis1 = durationThreshold.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis1); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log hot threads", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, - "*" + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1) + + "]*" + ) + ); + + // 4. Add more task and there should be no more logging since logging interval has not passed yet + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 5. Advance time to pass the logging interval and we should observe logging again when adding more task + final long elapsedMillis2 = interval.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis2); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1 + elapsedMillis2) + + "]*" ) ); } finally { continueLatch.countDown(); } + + // 6. Wait for the queue to drain and add one more task and the tracking should be reset + assertBusy(() -> assertThat(executor.getQueue().size(), lessThan(sizeThreshold - 1))); + assertThat(executor.getStartTimeOfLargeQueue(), equalTo(startTime)); + runTask(executor, () -> {}); + assertThat(executor.getStartTimeOfLargeQueue(), equalTo(-1L)); + + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + + private void runTask(EsThreadPoolExecutor executor, Runnable task) { + if (randomBoolean()) { + executor.execute(task); + } else { + executor.submit(task); + } } private static void waitForTimeElapse(TimeValue duration) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index fff11ff6a043d..56f276bc8a167 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -51,12 +52,13 @@ public void run() {} private final String name; private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig; + private final LongSupplier currentTimeMillisSupplier; // There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise // and can tolerate some inaccuracies. private volatile long startTimeOfLargeQueue = -1L; - private final FrequencyCappedAction hotThreadsLogger = new FrequencyCappedAction(System::currentTimeMillis, TimeValue.ZERO); + private final FrequencyCappedAction hotThreadsLogger; EsThreadPoolExecutor( String name, @@ -82,7 +84,6 @@ public void run() {} ); } - @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") EsThreadPoolExecutor( String name, int corePoolSize, @@ -94,11 +95,42 @@ public void run() {} RejectedExecutionHandler handler, ThreadContext contextHolder, EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig + ) { + this( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + handler, + contextHolder, + hotThreadsOnLargeQueueConfig, + System::currentTimeMillis + ); + } + + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + EsThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler, + ThreadContext contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig, + LongSupplier currentTimeMillisSupplier // For test to configure a custom time supplier ) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.hotThreadsLogger = new FrequencyCappedAction(currentTimeMillisSupplier, TimeValue.ZERO); this.hotThreadsLogger.setMinInterval(hotThreadsOnLargeQueueConfig.interval()); } @@ -114,16 +146,39 @@ public void setMaximumPoolSize(int maximumPoolSize) { @Override public void execute(Runnable command) { - final boolean isNotProbe = command != WORKER_PROBE; - final Runnable wrappedRunnable = isNotProbe ? wrapRunnable(command) : WORKER_PROBE; + final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE; + + maybeLogForLargeQueueSize(); - if (isNotProbe && hotThreadsOnLargeQueueConfig.isEnabled()) { + try { + super.execute(wrappedRunnable); + } catch (Exception e) { + if (wrappedRunnable instanceof AbstractRunnable abstractRunnable) { + try { + // If we are an abstract runnable we can handle the exception + // directly and don't need to rethrow it, but we log and assert + // any unexpected exception first. + if (e instanceof EsRejectedExecutionException == false) { + logException(abstractRunnable, e); + } + abstractRunnable.onRejection(e); + } finally { + abstractRunnable.onAfter(); + } + } else { + throw e; + } + } + } + + private void maybeLogForLargeQueueSize() { + if (hotThreadsOnLargeQueueConfig.isEnabled()) { int queueSize = getQueue().size(); // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, // thus reaching the threshold of 500. It won't log right away due to the duration threshold. if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { final long startTime = startTimeOfLargeQueue; - final long now = System.currentTimeMillis(); + final long now = currentTimeMillisSupplier.getAsLong(); if (startTime == -1) { startTimeOfLargeQueue = now; } else { @@ -149,26 +204,11 @@ public void execute(Runnable command) { startTimeOfLargeQueue = -1L; } } + } - try { - super.execute(wrappedRunnable); - } catch (Exception e) { - if (wrappedRunnable instanceof AbstractRunnable abstractRunnable) { - try { - // If we are an abstract runnable we can handle the exception - // directly and don't need to rethrow it, but we log and assert - // any unexpected exception first. - if (e instanceof EsRejectedExecutionException == false) { - logException(abstractRunnable, e); - } - abstractRunnable.onRejection(e); - } finally { - abstractRunnable.onAfter(); - } - } else { - throw e; - } - } + // package private for testing + long getStartTimeOfLargeQueue() { + return startTimeOfLargeQueue; } // package-visible for testing From a644445f842c5c8637df162e33a8aa7b246cc8ed Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 8 Jan 2026 14:30:16 +1100 Subject: [PATCH 08/15] tweak --- .../util/concurrent/ThreadPoolHotThreadsIT.java | 11 +++++++++++ .../common/util/concurrent/EsThreadPoolExecutor.java | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java index b9cdadf7d1d88..461ad7bd04a35 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java @@ -56,6 +56,17 @@ private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings final String node = internalCluster().startNode(nodeSettings); final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node); + for (var info : threadPool.info()) { + final var executor = threadPool.executor(info.getName()); + if (executor instanceof EsThreadPoolExecutor esThreadPoolExecutor) { + if (enabled && ThreadPool.Names.MANAGEMENT.equals(info.getName())) { + assertTrue(esThreadPoolExecutor.getHotThreadsOnLargeQueueConfig().isEnabled()); + } else { + assertFalse(esThreadPoolExecutor.getHotThreadsOnLargeQueueConfig().isEnabled()); + } + } + } + final var managementExecutor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.MANAGEMENT); final int maxManagementThreads = threadPool.info(ThreadPool.Names.MANAGEMENT).getMax(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 56f276bc8a167..2611f8d9421e0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -206,6 +206,11 @@ private void maybeLogForLargeQueueSize() { } } + // package private for testing + EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig() { + return hotThreadsOnLargeQueueConfig; + } + // package private for testing long getStartTimeOfLargeQueue() { return startTimeOfLargeQueue; From 6202a70766d90039b6f87a34202902195a523ae2 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 8 Jan 2026 16:51:04 +1100 Subject: [PATCH 09/15] thread-safe logging --- .../concurrent/ThreadPoolHotThreadsIT.java | 2 +- .../common/util/concurrent/EsExecutors.java | 5 +- .../util/concurrent/EsThreadPoolExecutor.java | 74 ++++++++++--------- .../DefaultBuiltInExecutorBuilders.java | 2 +- 4 files changed, 44 insertions(+), 39 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java index 461ad7bd04a35..453f3e7171bdc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java @@ -148,7 +148,7 @@ public void testEsThreadPoolExecutor() throws Exception { EsExecutors.daemonThreadFactory(nodeName, threadName), new EsExecutors.ForceQueuePolicy(false, false), new ThreadContext(Settings.EMPTY), - new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval), + new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval.millis()), timer::get ); assertThat(executor.getStartTimeOfLargeQueue(), equalTo(-1L)); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 8073ab01fb2a0..e91b40f21de76 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import java.util.List; @@ -662,9 +661,9 @@ public TaskTrackingConfig build() { } } - public record HotThreadsOnLargeQueueConfig(int sizeThreshold, long durationThresholdInMillis, TimeValue interval) { + public record HotThreadsOnLargeQueueConfig(int sizeThreshold, long durationThresholdInMillis, long intervalInMillis) { - public static final HotThreadsOnLargeQueueConfig DISABLED = new HotThreadsOnLargeQueueConfig(0, -1, TimeValue.MINUS_ONE); + public static final HotThreadsOnLargeQueueConfig DISABLED = new HotThreadsOnLargeQueueConfig(0, -1, -1); public boolean isEnabled() { return sizeThreshold > 0; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 2611f8d9421e0..13eec93fcc400 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.FrequencyCappedAction; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; @@ -23,6 +22,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Stream; @@ -34,6 +34,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); + private static final long NOT_TRACKED_TIME = -1L; // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy // https://github.com/elastic/elasticsearch/issues/124667 @@ -56,9 +57,9 @@ public void run() {} // There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise // and can tolerate some inaccuracies. - private volatile long startTimeOfLargeQueue = -1L; + private volatile long startTimeOfLargeQueue = NOT_TRACKED_TIME; - private final FrequencyCappedAction hotThreadsLogger; + private final AtomicLong lastLoggingTimeForHotThreads; EsThreadPoolExecutor( String name, @@ -130,8 +131,9 @@ public void run() {} this.contextHolder = contextHolder; this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; this.currentTimeMillisSupplier = currentTimeMillisSupplier; - this.hotThreadsLogger = new FrequencyCappedAction(currentTimeMillisSupplier, TimeValue.ZERO); - this.hotThreadsLogger.setMinInterval(hotThreadsOnLargeQueueConfig.interval()); + this.lastLoggingTimeForHotThreads = hotThreadsOnLargeQueueConfig.isEnabled() + ? new AtomicLong(currentTimeMillisSupplier.getAsLong() - hotThreadsOnLargeQueueConfig.intervalInMillis()) + : null; } @Override @@ -172,37 +174,41 @@ public void execute(Runnable command) { } private void maybeLogForLargeQueueSize() { - if (hotThreadsOnLargeQueueConfig.isEnabled()) { - int queueSize = getQueue().size(); - // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, - // thus reaching the threshold of 500. It won't log right away due to the duration threshold. - if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { - final long startTime = startTimeOfLargeQueue; - final long now = currentTimeMillisSupplier.getAsLong(); - if (startTime == -1) { - startTimeOfLargeQueue = now; - } else { - final long duration = now - startTime; - if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) { - hotThreadsLogger.maybeExecute(() -> { - HotThreads.logLocalHotThreads( - logger, - Level.INFO, - "ThreadPoolExecutor [" - + name - + "] queue size [" - + queueSize - + "] has been over threshold for [" - + TimeValue.timeValueMillis(duration) - + "]", - ReferenceDocs.LOGGING - ); - }); - } + if (hotThreadsOnLargeQueueConfig.isEnabled() == false) { + return; + } + + final int queueSize = getQueue().size(); + // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, + // thus reaching the threshold of 500. It won't log right away due to the duration threshold. + if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { + final long startTime = startTimeOfLargeQueue; + final long now = currentTimeMillisSupplier.getAsLong(); + if (startTime == NOT_TRACKED_TIME) { + startTimeOfLargeQueue = now; + return; + } + final long duration = now - startTime; + if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) { + final var lastLoggingTime = lastLoggingTimeForHotThreads.get(); + if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis() + && lastLoggingTimeForHotThreads.compareAndSet(lastLoggingTime, now)) { + HotThreads.logLocalHotThreads( + logger, + Level.INFO, + "ThreadPoolExecutor [" + + name + + "] queue size [" + + queueSize + + "] has been over threshold for [" + + TimeValue.timeValueMillis(duration) + + "]", + ReferenceDocs.LOGGING + ); } - } else { - startTimeOfLargeQueue = -1L; } + } else { + startTimeOfLargeQueue = NOT_TRACKED_TIME; } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 6d20baae31d92..3da4baf17a780 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -125,7 +125,7 @@ public Map getBuilders(Settings settings, int allocated new EsExecutors.HotThreadsOnLargeQueueConfig( hotThreadsOnLargeQueueSizeThreshold, hotThreadsOnLargeQueueDurationThreshold.millis(), - hotThreadsOnLargeQueueInterval + hotThreadsOnLargeQueueInterval.millis() ) ) ); From 61687b10f41558ba6109872bcef34e96f3f1a134 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 11:18:18 +1100 Subject: [PATCH 10/15] move settings registration and test --- .../concurrent/ThreadPoolHotThreadsIT.java | 266 ------------------ .../common/settings/ClusterSettings.java | 4 - .../EsThreadPoolExecutorTestHelper.java | 58 ++++ 3 files changed, 58 insertions(+), 270 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java create mode 100644 test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java deleted file mode 100644 index 453f3e7171bdc..0000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/common/util/concurrent/ThreadPoolHotThreadsIT.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.common.util.concurrent; - -import org.apache.logging.log4j.Level; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.MockLog; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.IntStream; - -import static org.elasticsearch.test.MockLog.assertThatLogger; -import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING; -import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) -public class ThreadPoolHotThreadsIT extends ESIntegTestCase { - - public void testNotHotThreadsForPersistedManagementThreadPoolQueueSizeWhenDisabled() throws Exception { - final Settings.Builder settingsBuilder = Settings.builder() - .put(HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.getKey(), TimeValue.timeValueSeconds(1)); - if (randomBoolean()) { - settingsBuilder.put(HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.getKey(), 0); - } - doTestHotThreadsForPersistedManagementThreadPoolQueueSize(settingsBuilder.build()); - } - - public void testHotThreadsForPersistedManagementThreadPoolQueueSizeWhenEnabled() throws Exception { - doTestHotThreadsForPersistedManagementThreadPoolQueueSize( - Settings.builder() - .put(HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.getKey(), between(8, 16)) - .put(HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.getKey(), TimeValue.timeValueSeconds(1)) - .build() - ); - } - - private void doTestHotThreadsForPersistedManagementThreadPoolQueueSize(Settings nodeSettings) throws Exception { - final int sizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(nodeSettings); - final TimeValue durationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(nodeSettings); - final boolean enabled = sizeThreshold > 0; - - final String node = internalCluster().startNode(nodeSettings); - final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node); - for (var info : threadPool.info()) { - final var executor = threadPool.executor(info.getName()); - if (executor instanceof EsThreadPoolExecutor esThreadPoolExecutor) { - if (enabled && ThreadPool.Names.MANAGEMENT.equals(info.getName())) { - assertTrue(esThreadPoolExecutor.getHotThreadsOnLargeQueueConfig().isEnabled()); - } else { - assertFalse(esThreadPoolExecutor.getHotThreadsOnLargeQueueConfig().isEnabled()); - } - } - } - - final var managementExecutor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.MANAGEMENT); - final int maxManagementThreads = threadPool.info(ThreadPool.Names.MANAGEMENT).getMax(); - - final var executingLatch = new CountDownLatch(maxManagementThreads); - final var continueLatch = new CountDownLatch(1); - - final Runnable blockingTask = () -> { - executingLatch.countDown(); - safeAwait(continueLatch); - }; - - // Activate all available threads - for (int i = 0; i < maxManagementThreads; i++) { - runTask(managementExecutor, blockingTask); - } - - safeAwait(executingLatch); - assertThat(managementExecutor.getActiveCount(), equalTo(maxManagementThreads)); - logger.info("--> all blocking tasks are running"); - - // Fill the queue up to the threshold - for (int i = 0; i < sizeThreshold; i++) { - runTask(managementExecutor, () -> {}); - } - assertThat(managementExecutor.getQueue().size(), equalTo(sizeThreshold)); - assertThat(managementExecutor.getStartTimeOfLargeQueue(), enabled ? greaterThan(0L) : equalTo(-1L)); - waitForTimeElapse(durationThreshold); - - try { - if (enabled) { - assertThatLogger( - () -> runTask(managementExecutor, () -> {}), - EsThreadPoolExecutor.class, - new MockLog.SeenEventExpectation( - "should log hot threads", - EsThreadPoolExecutor.class.getCanonicalName(), - Level.INFO, - "ThreadPoolExecutor [*/management] queue size [" + sizeThreshold + "] has been over threshold for *" - ) - ); - assertThat(managementExecutor.getStartTimeOfLargeQueue(), greaterThan(0L)); - } - - assertThatLogger( - () -> runTask(managementExecutor, () -> {}), - EsThreadPoolExecutor.class, - new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") - ); - assertThat(managementExecutor.getStartTimeOfLargeQueue(), enabled ? greaterThan(0L) : equalTo(-1L)); - } finally { - continueLatch.countDown(); - } - - if (enabled) { - assertBusy(() -> assertThat(managementExecutor.getQueue().size(), lessThan(sizeThreshold - 1))); - // Run a final task and the tracking should be cleared since the large queue size is gone - runTask(managementExecutor, () -> {}); - assertThat(managementExecutor.getStartTimeOfLargeQueue(), equalTo(-1L)); - } - } - - public void testEsThreadPoolExecutor() throws Exception { - final long startTime = randomLongBetween(0, 1000); - final var timer = new AtomicLong(startTime); - final String nodeName = randomIdentifier(); - final String threadName = randomIdentifier(); - final int maxThreads = between(1, 5); - final var sizeThreshold = between(10, 100); - final TimeValue durationThreshold = randomTimeValue(1, 5, TimeUnit.MINUTES); - final TimeValue interval = randomTimeValue(30, 60, TimeUnit.MINUTES); - - final var executor = new EsThreadPoolExecutor( - threadName, - 1, - maxThreads, - 5, - TimeUnit.MINUTES, - new EsExecutors.ExecutorScalingQueue<>(), - EsExecutors.daemonThreadFactory(nodeName, threadName), - new EsExecutors.ForceQueuePolicy(false, false), - new ThreadContext(Settings.EMPTY), - new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval.millis()), - timer::get - ); - assertThat(executor.getStartTimeOfLargeQueue(), equalTo(-1L)); - - final var executingLatch = new CountDownLatch(maxThreads); - final var continueLatch = new CountDownLatch(1); - - final Runnable blockingTask = () -> { - executingLatch.countDown(); - safeAwait(continueLatch); - }; - - // Activate all available threads - for (int i = 0; i < maxThreads; i++) { - runTask(executor, blockingTask); - } - safeAwait(executingLatch); - assertThat(executor.getActiveCount(), equalTo(maxThreads)); - - try { - // 1. Fill the queue up to the threshold - No logging but tracking should start due to queue size reaching the threshold - assertThatLogger(() -> { - for (int i = 0; i < sizeThreshold; i++) { - runTask(executor, () -> {}); - } - assertThat(executor.getQueue().size(), equalTo(sizeThreshold)); - assertThat(executor.getStartTimeOfLargeQueue(), equalTo(timer.get())); - }, - EsThreadPoolExecutor.class, - new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") - ); - - // 2. Queue more tasks but should not see logging yet since duration threshold not met - assertThatLogger( - () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), - EsThreadPoolExecutor.class, - new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") - ); - - // 3. Advance time and we should observe logging when adding more task - final long elapsedMillis1 = durationThreshold.millis() + randomLongBetween(0, 1000); - timer.addAndGet(elapsedMillis1); - assertThatLogger( - () -> runTask(executor, () -> {}), - EsThreadPoolExecutor.class, - new MockLog.SeenEventExpectation( - "should log hot threads", - EsThreadPoolExecutor.class.getCanonicalName(), - Level.INFO, - "ThreadPoolExecutor [" - + threadName - + "] queue size [" - + executor.getQueue().size() - + "] has been over threshold for [" - + TimeValue.timeValueMillis(elapsedMillis1) - + "]*" - ) - ); - - // 4. Add more task and there should be no more logging since logging interval has not passed yet - assertThatLogger( - () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), - EsThreadPoolExecutor.class, - new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") - ); - - // 5. Advance time to pass the logging interval and we should observe logging again when adding more task - final long elapsedMillis2 = interval.millis() + randomLongBetween(0, 1000); - timer.addAndGet(elapsedMillis2); - assertThatLogger( - () -> runTask(executor, () -> {}), - EsThreadPoolExecutor.class, - new MockLog.SeenEventExpectation( - "should log hot threads", - EsThreadPoolExecutor.class.getCanonicalName(), - Level.INFO, - "ThreadPoolExecutor [" - + threadName - + "] queue size [" - + executor.getQueue().size() - + "] has been over threshold for [" - + TimeValue.timeValueMillis(elapsedMillis1 + elapsedMillis2) - + "]*" - ) - ); - } finally { - continueLatch.countDown(); - } - - // 6. Wait for the queue to drain and add one more task and the tracking should be reset - assertBusy(() -> assertThat(executor.getQueue().size(), lessThan(sizeThreshold - 1))); - assertThat(executor.getStartTimeOfLargeQueue(), equalTo(startTime)); - runTask(executor, () -> {}); - assertThat(executor.getStartTimeOfLargeQueue(), equalTo(-1L)); - - ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); - } - - private void runTask(EsThreadPoolExecutor executor, Runnable task) { - if (randomBoolean()) { - executor.execute(task); - } else { - executor.submit(task); - } - } - - private static void waitForTimeElapse(TimeValue duration) { - final var startTime = System.currentTimeMillis(); - // Ensure we wait for at least the specified duration by loop and explicit check the elapsed time to avoid - // potential inconsistency between slept time and System.currentTimeMillis() difference. - while (System.currentTimeMillis() - startTime < duration.millis()) { - safeSleep(duration); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 91cdafabf8c99..15c2ae238f00d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -138,7 +138,6 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker; import org.elasticsearch.snapshots.SnapshotsService; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterPortSettings; import org.elasticsearch.transport.RemoteClusterSettings; @@ -562,9 +561,6 @@ public void apply(Settings value, Settings current, Settings previous) { ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING, - ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING, - ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING, - ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java new file mode 100644 index 0000000000000..1231b8264e62e --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java @@ -0,0 +1,58 @@ +/* + * ELASTICSEARCH CONFIDENTIAL + * __________________ + * + * Copyright Elasticsearch B.V. All rights reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of Elasticsearch B.V. and its suppliers, if any. + * The intellectual and technical concepts contained herein + * are proprietary to Elasticsearch B.V. and its suppliers and + * may be covered by U.S. and Foreign Patents, patents in + * process, and are protected by trade secret or copyright + * law. Dissemination of this information or reproduction of + * this material is strictly forbidden unless prior written + * permission is obtained from Elasticsearch B.V. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.settings.Settings; + +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +import static org.elasticsearch.test.ESTestCase.randomIdentifier; + +public class EsThreadPoolExecutorTestHelper { + + public static EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig(EsThreadPoolExecutor executor) { + return executor.getHotThreadsOnLargeQueueConfig(); + } + + public static long getStartTimeOfLargeQueue(EsThreadPoolExecutor executor) { + return executor.getStartTimeOfLargeQueue(); + } + + public static EsThreadPoolExecutor newEsThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig, + LongSupplier currentTimeMillisSupplier + ) { + return new EsThreadPoolExecutor( + name, + corePoolSize, + maximumPoolSize, + 5, + TimeUnit.MINUTES, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(randomIdentifier(), name), + new EsExecutors.ForceQueuePolicy(false, false), + new ThreadContext(Settings.EMPTY), + hotThreadsOnLargeQueueConfig, + currentTimeMillisSupplier + ); + } +} From f1c9fa43b3248f1c33a2f65a09faf957476eebcc Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 11:20:03 +1100 Subject: [PATCH 11/15] add millis in names --- .../util/concurrent/EsThreadPoolExecutor.java | 20 +++++++++---------- .../EsThreadPoolExecutorTestHelper.java | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 13eec93fcc400..83bd01892a648 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -57,9 +57,9 @@ public void run() {} // There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise // and can tolerate some inaccuracies. - private volatile long startTimeOfLargeQueue = NOT_TRACKED_TIME; + private volatile long startTimeMillisOfLargeQueue = NOT_TRACKED_TIME; - private final AtomicLong lastLoggingTimeForHotThreads; + private final AtomicLong lastLoggingTimeMillisForHotThreads; EsThreadPoolExecutor( String name, @@ -131,7 +131,7 @@ public void run() {} this.contextHolder = contextHolder; this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; this.currentTimeMillisSupplier = currentTimeMillisSupplier; - this.lastLoggingTimeForHotThreads = hotThreadsOnLargeQueueConfig.isEnabled() + this.lastLoggingTimeMillisForHotThreads = hotThreadsOnLargeQueueConfig.isEnabled() ? new AtomicLong(currentTimeMillisSupplier.getAsLong() - hotThreadsOnLargeQueueConfig.intervalInMillis()) : null; } @@ -182,17 +182,17 @@ private void maybeLogForLargeQueueSize() { // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, // thus reaching the threshold of 500. It won't log right away due to the duration threshold. if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { - final long startTime = startTimeOfLargeQueue; + final long startTime = startTimeMillisOfLargeQueue; final long now = currentTimeMillisSupplier.getAsLong(); if (startTime == NOT_TRACKED_TIME) { - startTimeOfLargeQueue = now; + startTimeMillisOfLargeQueue = now; return; } final long duration = now - startTime; if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) { - final var lastLoggingTime = lastLoggingTimeForHotThreads.get(); + final var lastLoggingTime = lastLoggingTimeMillisForHotThreads.get(); if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis() - && lastLoggingTimeForHotThreads.compareAndSet(lastLoggingTime, now)) { + && lastLoggingTimeMillisForHotThreads.compareAndSet(lastLoggingTime, now)) { HotThreads.logLocalHotThreads( logger, Level.INFO, @@ -208,7 +208,7 @@ private void maybeLogForLargeQueueSize() { } } } else { - startTimeOfLargeQueue = NOT_TRACKED_TIME; + startTimeMillisOfLargeQueue = NOT_TRACKED_TIME; } } @@ -218,8 +218,8 @@ EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig() { } // package private for testing - long getStartTimeOfLargeQueue() { - return startTimeOfLargeQueue; + long getStartTimeMillisOfLargeQueue() { + return startTimeMillisOfLargeQueue; } // package-visible for testing diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java index 1231b8264e62e..221a84e43d14c 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java @@ -30,8 +30,8 @@ public static EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueue return executor.getHotThreadsOnLargeQueueConfig(); } - public static long getStartTimeOfLargeQueue(EsThreadPoolExecutor executor) { - return executor.getStartTimeOfLargeQueue(); + public static long getStartTimeMillisOfLargeQueue(EsThreadPoolExecutor executor) { + return executor.getStartTimeMillisOfLargeQueue(); } public static EsThreadPoolExecutor newEsThreadPoolExecutor( From acd166510fb976606ed43f7d09ebceee03e2db93 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 11:41:47 +1100 Subject: [PATCH 12/15] extra logging --- .../common/util/concurrent/EsThreadPoolExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 83bd01892a648..b49db00b5a00e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -193,6 +193,7 @@ private void maybeLogForLargeQueueSize() { final var lastLoggingTime = lastLoggingTimeMillisForHotThreads.get(); if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis() && lastLoggingTimeMillisForHotThreads.compareAndSet(lastLoggingTime, now)) { + logger.info("start logging hot-threads for large queue size [{}] on [{}] executor", queueSize, name); HotThreads.logLocalHotThreads( logger, Level.INFO, From 0e6ec3d4e1e68e456604d79ddcc2da40e47cc178 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 12:40:35 +1100 Subject: [PATCH 13/15] license headers --- .../EsThreadPoolExecutorTestHelper.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java index 221a84e43d14c..c1227ab249e17 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java @@ -1,18 +1,10 @@ /* - * ELASTICSEARCH CONFIDENTIAL - * __________________ - * - * Copyright Elasticsearch B.V. All rights reserved. - * - * NOTICE: All information contained herein is, and remains - * the property of Elasticsearch B.V. and its suppliers, if any. - * The intellectual and technical concepts contained herein - * are proprietary to Elasticsearch B.V. and its suppliers and - * may be covered by U.S. and Foreign Patents, patents in - * process, and are protected by trade secret or copyright - * law. Dissemination of this information or reproduction of - * this material is strictly forbidden unless prior written - * permission is obtained from Elasticsearch B.V. + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ package org.elasticsearch.common.util.concurrent; From 8328d5df6007e02c211f68870ad0cd9434561640 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 15:07:29 +1100 Subject: [PATCH 14/15] move unit test --- .../EsThreadPoolExecutorTestHelper.java | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java index c1227ab249e17..d39f024319a94 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java @@ -9,13 +9,6 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.common.settings.Settings; - -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; - -import static org.elasticsearch.test.ESTestCase.randomIdentifier; - public class EsThreadPoolExecutorTestHelper { public static EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig(EsThreadPoolExecutor executor) { @@ -25,26 +18,4 @@ public static EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueue public static long getStartTimeMillisOfLargeQueue(EsThreadPoolExecutor executor) { return executor.getStartTimeMillisOfLargeQueue(); } - - public static EsThreadPoolExecutor newEsThreadPoolExecutor( - String name, - int corePoolSize, - int maximumPoolSize, - EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig, - LongSupplier currentTimeMillisSupplier - ) { - return new EsThreadPoolExecutor( - name, - corePoolSize, - maximumPoolSize, - 5, - TimeUnit.MINUTES, - new EsExecutors.ExecutorScalingQueue<>(), - EsExecutors.daemonThreadFactory(randomIdentifier(), name), - new EsExecutors.ForceQueuePolicy(false, false), - new ThreadContext(Settings.EMPTY), - hotThreadsOnLargeQueueConfig, - currentTimeMillisSupplier - ); - } } From 84a98d41d6b01a7ba2d5dc2b4a43813923871c19 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 9 Jan 2026 15:10:35 +1100 Subject: [PATCH 15/15] actually move unit test --- .../EsThreadPoolExecutorHotThreadsTests.java | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java new file mode 100644 index 0000000000000..72ad3d9dba1e1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.MockLog.assertThatLogger; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class EsThreadPoolExecutorHotThreadsTests extends ESTestCase { + + private EsThreadPoolExecutor executor; + + public void tearDown() throws Exception { + super.tearDown(); + if (executor != null) { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + public void testEsThreadPoolExecutor() throws Exception { + final long startTime = randomLongBetween(0, 1000); + final var timer = new AtomicLong(startTime); + final String threadName = randomIdentifier(); + final int maxThreads = between(1, 5); + final var sizeThreshold = between(10, 100); + final TimeValue durationThreshold = randomTimeValue(1, 5, TimeUnit.MINUTES); + final TimeValue interval = randomTimeValue(30, 60, TimeUnit.MINUTES); + + executor = new EsThreadPoolExecutor( + threadName, + 1, + maxThreads, + 5, + TimeUnit.MINUTES, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(randomIdentifier(), threadName), + new EsExecutors.ForceQueuePolicy(false, false), + new ThreadContext(Settings.EMPTY), + new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval.millis()), + timer::get + ); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(-1L)); + + final var executingLatch = new CountDownLatch(maxThreads); + final var continueLatch = new CountDownLatch(1); + + final Runnable blockingTask = () -> { + executingLatch.countDown(); + safeAwait(continueLatch); + }; + + // Activate all available threads + for (int i = 0; i < maxThreads; i++) { + runTask(executor, blockingTask); + } + safeAwait(executingLatch); + assertThat(executor.getActiveCount(), equalTo(maxThreads)); + + try { + // 1. Fill the queue up to the threshold - No logging but tracking should start due to queue size reaching the threshold + assertThatLogger(() -> { + for (int i = 0; i < sizeThreshold; i++) { + runTask(executor, () -> {}); + } + assertThat(executor.getQueue().size(), equalTo(sizeThreshold)); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(timer.get())); + }, + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 2. Queue more tasks but should not see logging yet since duration threshold not met + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 3. Advance time and we should observe logging when adding more task + final long elapsedMillis1 = durationThreshold.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis1); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log starting hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "start logging hot-threads for large queue size [" + executor.getQueue().size() + "] on [" + threadName + "] executor" + ), + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1) + + "]*" + ) + ); + + // 4. Add more task and there should be no more logging since logging interval has not passed yet + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 5. Advance time to pass the logging interval and we should observe logging again when adding more task + final long elapsedMillis2 = interval.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis2); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log starting hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "start logging hot-threads for large queue size [" + executor.getQueue().size() + "] on [" + threadName + "] executor" + ), + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1 + elapsedMillis2) + + "]*" + ) + ); + } finally { + continueLatch.countDown(); + } + + // 6. Wait for the queue to drain and add one more task and the tracking should be reset + assertBusy(() -> assertThat(executor.getQueue().size(), lessThan(sizeThreshold - 1))); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(startTime)); + runTask(executor, () -> {}); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(-1L)); + } + + private void runTask(EsThreadPoolExecutor executor, Runnable task) { + if (randomBoolean()) { + executor.execute(task); + } else { + executor.submit(task); + } + } +}