diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 5ed4136edd49a..e91b40f21de76 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -119,12 +119,14 @@ public static EsThreadPoolExecutor newScaling( boolean rejectAfterShutdown, ThreadFactory threadFactory, ThreadContext contextHolder, - TaskTrackingConfig config + TaskTrackingConfig config, + HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max); // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty), // probing the worker pool prevents this. boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue; + final ForceQueuePolicy queuePolicy = new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool); if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, @@ -135,9 +137,10 @@ public static EsThreadPoolExecutor newScaling( queue, TimedRunnable::new, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), + queuePolicy, contextHolder, - config + config, + hotThreadsOnLargeQueueConfig ); } else { return new EsThreadPoolExecutor( @@ -148,8 +151,9 @@ public static EsThreadPoolExecutor newScaling( unit, queue, threadFactory, - new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool), - contextHolder + queuePolicy, + contextHolder, + hotThreadsOnLargeQueueConfig ); } } @@ -188,7 +192,8 @@ public static EsThreadPoolExecutor newScaling( rejectAfterShutdown, threadFactory, contextHolder, - TaskTrackingConfig.DO_NOT_TRACK + TaskTrackingConfig.DO_NOT_TRACK, + HotThreadsOnLargeQueueConfig.DISABLED ); } @@ -221,7 +226,8 @@ public static EsThreadPoolExecutor newFixed( threadFactory, rejectedExecutionHandler, contextHolder, - config + config, + HotThreadsOnLargeQueueConfig.DISABLED ); } else { return new EsThreadPoolExecutor( @@ -233,7 +239,8 @@ public static EsThreadPoolExecutor newFixed( queue, threadFactory, rejectedExecutionHandler, - contextHolder + contextHolder, + HotThreadsOnLargeQueueConfig.DISABLED ); } } @@ -654,4 +661,12 @@ public TaskTrackingConfig build() { } } + public record HotThreadsOnLargeQueueConfig(int sizeThreshold, long durationThresholdInMillis, long intervalInMillis) { + + public static final HotThreadsOnLargeQueueConfig DISABLED = new HotThreadsOnLargeQueueConfig(0, -1, -1); + + public boolean isEnabled() { + return sizeThreshold > 0; + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index ad4616692850e..b49db00b5a00e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -9,15 +9,21 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.monitor.jvm.HotThreads; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -28,6 +34,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); + private static final long NOT_TRACKED_TIME = -1L; // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy // https://github.com/elastic/elasticsearch/issues/124667 @@ -45,6 +52,15 @@ public void run() {} */ private final String name; + private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig; + private final LongSupplier currentTimeMillisSupplier; + + // There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise + // and can tolerate some inaccuracies. + private volatile long startTimeMillisOfLargeQueue = NOT_TRACKED_TIME; + + private final AtomicLong lastLoggingTimeMillisForHotThreads; + EsThreadPoolExecutor( String name, int corePoolSize, @@ -55,7 +71,45 @@ public void run() {} ThreadFactory threadFactory, ThreadContext contextHolder ) { - this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); + this( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + new EsAbortPolicy(), + contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED + ); + } + + EsThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler, + ThreadContext contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig + ) { + this( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + handler, + contextHolder, + hotThreadsOnLargeQueueConfig, + System::currentTimeMillis + ); } @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") @@ -68,11 +122,18 @@ public void run() {} BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, - ThreadContext contextHolder + ThreadContext contextHolder, + EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig, + LongSupplier currentTimeMillisSupplier // For test to configure a custom time supplier ) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; + this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.lastLoggingTimeMillisForHotThreads = hotThreadsOnLargeQueueConfig.isEnabled() + ? new AtomicLong(currentTimeMillisSupplier.getAsLong() - hotThreadsOnLargeQueueConfig.intervalInMillis()) + : null; } @Override @@ -88,6 +149,9 @@ public void setMaximumPoolSize(int maximumPoolSize) { @Override public void execute(Runnable command) { final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE; + + maybeLogForLargeQueueSize(); + try { super.execute(wrappedRunnable); } catch (Exception e) { @@ -109,6 +173,56 @@ public void execute(Runnable command) { } } + private void maybeLogForLargeQueueSize() { + if (hotThreadsOnLargeQueueConfig.isEnabled() == false) { + return; + } + + final int queueSize = getQueue().size(); + // Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well, + // thus reaching the threshold of 500. It won't log right away due to the duration threshold. + if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) { + final long startTime = startTimeMillisOfLargeQueue; + final long now = currentTimeMillisSupplier.getAsLong(); + if (startTime == NOT_TRACKED_TIME) { + startTimeMillisOfLargeQueue = now; + return; + } + final long duration = now - startTime; + if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) { + final var lastLoggingTime = lastLoggingTimeMillisForHotThreads.get(); + if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis() + && lastLoggingTimeMillisForHotThreads.compareAndSet(lastLoggingTime, now)) { + logger.info("start logging hot-threads for large queue size [{}] on [{}] executor", queueSize, name); + HotThreads.logLocalHotThreads( + logger, + Level.INFO, + "ThreadPoolExecutor [" + + name + + "] queue size [" + + queueSize + + "] has been over threshold for [" + + TimeValue.timeValueMillis(duration) + + "]", + ReferenceDocs.LOGGING + ); + } + } + } else { + startTimeMillisOfLargeQueue = NOT_TRACKED_TIME; + } + } + + // package private for testing + EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig() { + return hotThreadsOnLargeQueueConfig; + } + + // package private for testing + long getStartTimeMillisOfLargeQueue() { + return startTimeMillisOfLargeQueue; + } + // package-visible for testing void logException(AbstractRunnable r, Exception e) { logger.error(() -> format("[%s] unexpected exception when submitting task [%s] for execution", name, r), e); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f66b6384211b1..4ce1e05a597cd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.metrics.ExponentialBucketHistogram; +import org.elasticsearch.common.util.concurrent.EsExecutors.HotThreadsOnLargeQueueConfig; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.core.TimeValue; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; @@ -72,9 +73,21 @@ public enum UtilizationTrackingPurpose { ThreadFactory threadFactory, RejectedExecutionHandler handler, ThreadContext contextHolder, - TaskTrackingConfig trackingConfig + TaskTrackingConfig trackingConfig, + HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { - super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); + super( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + handler, + contextHolder, + hotThreadsOnLargeQueueConfig + ); this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 23728fe68bb0f..3da4baf17a780 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -20,6 +20,9 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING; +import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; @@ -32,6 +35,9 @@ public Map getBuilders(Settings settings, int allocated final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512); final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings); + final int hotThreadsOnLargeQueueSizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(settings); + final TimeValue hotThreadsOnLargeQueueDurationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(settings); + final TimeValue hotThreadsOnLargeQueueInterval = HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING.get(settings); Map result = new HashMap<>(); result.put( @@ -115,7 +121,12 @@ public Map getBuilders(Settings settings, int allocated 1, ThreadPool.boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5), - false + false, + new EsExecutors.HotThreadsOnLargeQueueConfig( + hotThreadsOnLargeQueueSizeThreshold, + hotThreadsOnLargeQueueDurationThreshold.millis(), + hotThreadsOnLargeQueueInterval.millis() + ) ) ); result.put( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java index 0fb2f1e471d0b..4d62027fd4edc 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java @@ -32,11 +32,32 @@ */ public final class ScalingExecutorBuilder extends ExecutorBuilder { + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING = Setting.intSetting( + "thread_pools.hot_threads_on_large_queue.size_threshold", + 0, // default 0 means disabled + 0, + Setting.Property.NodeScope + ); + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING = Setting.timeSetting( + "thread_pools.hot_threads_on_large_queue.duration_threshold", + TimeValue.timeValueMinutes(2), + TimeValue.timeValueSeconds(1), + Setting.Property.NodeScope + ); + + public static final Setting HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING = Setting.timeSetting( + "thread_pools.hot_threads_on_large_queue.interval", + TimeValue.timeValueMinutes(60), + TimeValue.timeValueMinutes(10), + Setting.Property.NodeScope + ); + private final Setting coreSetting; private final Setting maxSetting; private final Setting keepAliveSetting; private final boolean rejectAfterShutdown; private final EsExecutors.TaskTrackingConfig trackingConfig; + private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig; /** * Construct a scaling executor builder; the settings will have the @@ -59,6 +80,38 @@ public ScalingExecutorBuilder( this(name, core, max, keepAlive, rejectAfterShutdown, "thread_pool." + name); } + /** + * Construct a scaling executor builder; the settings will have the + * key prefix "thread_pool." followed by the executor name. + * + * @param name the name of the executor + * @param core the minimum number of threads in the pool + * @param max the maximum number of threads in the pool + * @param keepAlive the time that spare threads above {@code core} + * threads will be kept alive + * @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown + * @param hotThreadsOnLargeQueueConfig configuration for indicating whether hot threads should be logged on large queue size + */ + public ScalingExecutorBuilder( + final String name, + final int core, + final int max, + final TimeValue keepAlive, + final boolean rejectAfterShutdown, + final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig + ) { + this( + name, + core, + max, + keepAlive, + rejectAfterShutdown, + "thread_pool." + name, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK, + hotThreadsOnLargeQueueConfig + ); + } + /** * Construct a scaling executor builder; the settings will have the * specified key prefix. @@ -79,7 +132,16 @@ public ScalingExecutorBuilder( final boolean rejectAfterShutdown, final String prefix ) { - this(name, core, max, keepAlive, rejectAfterShutdown, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK); + this( + name, + core, + max, + keepAlive, + rejectAfterShutdown, + prefix, + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED + ); } /** @@ -94,6 +156,7 @@ public ScalingExecutorBuilder( * @param prefix the prefix for the settings keys * @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown * @param trackingConfig configuration that'll indicate if we should track statistics about task execution time + * @param hotThreadsOnLargeQueueConfig configuration for indicating whether hot threads should be logged on large queue size */ public ScalingExecutorBuilder( final String name, @@ -102,7 +165,8 @@ public ScalingExecutorBuilder( final TimeValue keepAlive, final boolean rejectAfterShutdown, final String prefix, - final EsExecutors.TaskTrackingConfig trackingConfig + final EsExecutors.TaskTrackingConfig trackingConfig, + final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig ) { super(name, false); this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope); @@ -115,6 +179,7 @@ public ScalingExecutorBuilder( ); this.rejectAfterShutdown = rejectAfterShutdown; this.trackingConfig = trackingConfig; + this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig; } @Override @@ -147,7 +212,8 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th rejectAfterShutdown, threadFactory, threadContext, - trackingConfig + trackingConfig, + hotThreadsOnLargeQueueConfig ); return new ThreadPool.ExecutorHolder(executor, info); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index dd7274a5e0186..20be93291c5f1 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -690,7 +690,8 @@ public void testScalingWithTaskTimeTracking() { threadContext, randomBoolean() ? EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(executionTimeEwma).build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(executionTimeEwma).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } @@ -719,7 +720,8 @@ public void testScalingWithTaskTimeTracking() { randomBoolean(), TestEsExecutors.testOnlyDaemonThreadFactory("test"), threadContext, - DO_NOT_TRACK + DO_NOT_TRACK, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); assertThat(pool, instanceOf(EsThreadPoolExecutor.class)); } @@ -840,7 +842,8 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() { new EsExecutors.ExecutorScalingQueue<>(), TestEsExecutors.testOnlyDaemonThreadFactory(getTestName()), new EsExecutors.ForceQueuePolicy(true, true), - threadContext + threadContext, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ) ); } @@ -857,7 +860,8 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() { new EsExecutors.ExecutorScalingQueue<>(), TestEsExecutors.testOnlyDaemonThreadFactory(getTestName()), new EsExecutors.ForceQueuePolicy(true, true), - threadContext + threadContext, + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ) ); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java new file mode 100644 index 0000000000000..72ad3d9dba1e1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorHotThreadsTests.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.MockLog.assertThatLogger; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class EsThreadPoolExecutorHotThreadsTests extends ESTestCase { + + private EsThreadPoolExecutor executor; + + public void tearDown() throws Exception { + super.tearDown(); + if (executor != null) { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + public void testEsThreadPoolExecutor() throws Exception { + final long startTime = randomLongBetween(0, 1000); + final var timer = new AtomicLong(startTime); + final String threadName = randomIdentifier(); + final int maxThreads = between(1, 5); + final var sizeThreshold = between(10, 100); + final TimeValue durationThreshold = randomTimeValue(1, 5, TimeUnit.MINUTES); + final TimeValue interval = randomTimeValue(30, 60, TimeUnit.MINUTES); + + executor = new EsThreadPoolExecutor( + threadName, + 1, + maxThreads, + 5, + TimeUnit.MINUTES, + new EsExecutors.ExecutorScalingQueue<>(), + EsExecutors.daemonThreadFactory(randomIdentifier(), threadName), + new EsExecutors.ForceQueuePolicy(false, false), + new ThreadContext(Settings.EMPTY), + new EsExecutors.HotThreadsOnLargeQueueConfig(sizeThreshold, durationThreshold.millis(), interval.millis()), + timer::get + ); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(-1L)); + + final var executingLatch = new CountDownLatch(maxThreads); + final var continueLatch = new CountDownLatch(1); + + final Runnable blockingTask = () -> { + executingLatch.countDown(); + safeAwait(continueLatch); + }; + + // Activate all available threads + for (int i = 0; i < maxThreads; i++) { + runTask(executor, blockingTask); + } + safeAwait(executingLatch); + assertThat(executor.getActiveCount(), equalTo(maxThreads)); + + try { + // 1. Fill the queue up to the threshold - No logging but tracking should start due to queue size reaching the threshold + assertThatLogger(() -> { + for (int i = 0; i < sizeThreshold; i++) { + runTask(executor, () -> {}); + } + assertThat(executor.getQueue().size(), equalTo(sizeThreshold)); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(timer.get())); + }, + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 2. Queue more tasks but should not see logging yet since duration threshold not met + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 3. Advance time and we should observe logging when adding more task + final long elapsedMillis1 = durationThreshold.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis1); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log starting hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "start logging hot-threads for large queue size [" + executor.getQueue().size() + "] on [" + threadName + "] executor" + ), + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1) + + "]*" + ) + ); + + // 4. Add more task and there should be no more logging since logging interval has not passed yet + assertThatLogger( + () -> IntStream.range(0, between(1, 5)).forEach(ignore -> runTask(executor, () -> {})), + EsThreadPoolExecutor.class, + new MockLog.UnseenEventExpectation("should not log", EsThreadPoolExecutor.class.getCanonicalName(), Level.INFO, "*") + ); + + // 5. Advance time to pass the logging interval and we should observe logging again when adding more task + final long elapsedMillis2 = interval.millis() + randomLongBetween(0, 1000); + timer.addAndGet(elapsedMillis2); + assertThatLogger( + () -> runTask(executor, () -> {}), + EsThreadPoolExecutor.class, + new MockLog.SeenEventExpectation( + "should log starting hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "start logging hot-threads for large queue size [" + executor.getQueue().size() + "] on [" + threadName + "] executor" + ), + new MockLog.SeenEventExpectation( + "should log hot threads", + EsThreadPoolExecutor.class.getCanonicalName(), + Level.INFO, + "ThreadPoolExecutor [" + + threadName + + "] queue size [" + + executor.getQueue().size() + + "] has been over threshold for [" + + TimeValue.timeValueMillis(elapsedMillis1 + elapsedMillis2) + + "]*" + ) + ); + } finally { + continueLatch.countDown(); + } + + // 6. Wait for the queue to drain and add one more task and the tracking should be reset + assertBusy(() -> assertThat(executor.getQueue().size(), lessThan(sizeThreshold - 1))); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(startTime)); + runTask(executor, () -> {}); + assertThat(executor.getStartTimeMillisOfLargeQueue(), equalTo(-1L)); + } + + private void runTask(EsThreadPoolExecutor executor, Runnable task) { + if (randomBoolean()) { + executor.execute(task); + } else { + executor.submit(task); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 0c379dc26321f..558b143678ce4 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -57,7 +57,8 @@ public void testExecutionEWMACalculation() throws Exception { .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) .build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -129,7 +130,8 @@ public void testFrontOfQueueLatency() throws Exception { : EsExecutors.TaskTrackingConfig.builder() .trackMaxQueueLatency() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); try { executor.prestartAllCoreThreads(); @@ -196,7 +198,8 @@ public void testMaxDequeuedQueueLatency() throws Exception { : EsExecutors.TaskTrackingConfig.builder() .trackMaxQueueLatency() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); try { executor.prestartAllCoreThreads(); @@ -242,7 +245,8 @@ public void testExceptionThrowingTask() throws Exception { .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) .build() - : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build() + : EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); @@ -276,7 +280,8 @@ public void testGetOngoingTasks() throws Exception { EsExecutors.TaskTrackingConfig.builder() .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); var taskRunningLatch = new CountDownLatch(1); var exitTaskLatch = new CountDownLatch(1); @@ -313,7 +318,8 @@ public void testQueueLatencyHistogramMetrics() { EsExecutors.TaskTrackingConfig.builder() .trackOngoingTasks() .trackExecutionTime(DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST) - .build() + .build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ); executor.setupMetrics(meterRegistry, threadPoolName); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java new file mode 100644 index 0000000000000..d39f024319a94 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTestHelper.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +public class EsThreadPoolExecutorTestHelper { + + public static EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig(EsThreadPoolExecutor executor) { + return executor.getHotThreadsOnLargeQueueConfig(); + } + + public static long getStartTimeMillisOfLargeQueue(EsThreadPoolExecutor executor) { + return executor.getStartTimeMillisOfLargeQueue(); + } +} diff --git a/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java b/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java index 0d7fedc71cfc1..d9c74a216be4b 100644 --- a/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java +++ b/x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java @@ -175,7 +175,8 @@ public static ExecutorBuilder[] statelessExecutorBuilders(Settings settings, TimeValue.timeValueMinutes(5), true, SHARD_READ_THREAD_POOL_SETTING, - EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build() + EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build(), + EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED ), new ScalingExecutorBuilder( TRANSLOG_THREAD_POOL,