Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,14 @@ public static EsThreadPoolExecutor newScaling(
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder,
TaskTrackingConfig config
TaskTrackingConfig config,
HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig
) {
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
// Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
// probing the worker pool prevents this.
boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue;
final ForceQueuePolicy queuePolicy = new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool);
if (config.trackExecutionTime()) {
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
Expand All @@ -135,9 +137,10 @@ public static EsThreadPoolExecutor newScaling(
queue,
TimedRunnable::new,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
queuePolicy,
contextHolder,
config
config,
hotThreadsOnLargeQueueConfig
);
} else {
return new EsThreadPoolExecutor(
Expand All @@ -148,8 +151,9 @@ public static EsThreadPoolExecutor newScaling(
unit,
queue,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
contextHolder
queuePolicy,
contextHolder,
hotThreadsOnLargeQueueConfig
);
}
}
Expand Down Expand Up @@ -188,7 +192,8 @@ public static EsThreadPoolExecutor newScaling(
rejectAfterShutdown,
threadFactory,
contextHolder,
TaskTrackingConfig.DO_NOT_TRACK
TaskTrackingConfig.DO_NOT_TRACK,
HotThreadsOnLargeQueueConfig.DISABLED
);
}

Expand Down Expand Up @@ -221,7 +226,8 @@ public static EsThreadPoolExecutor newFixed(
threadFactory,
rejectedExecutionHandler,
contextHolder,
config
config,
HotThreadsOnLargeQueueConfig.DISABLED
);
} else {
return new EsThreadPoolExecutor(
Expand All @@ -233,7 +239,8 @@ public static EsThreadPoolExecutor newFixed(
queue,
threadFactory,
rejectedExecutionHandler,
contextHolder
contextHolder,
HotThreadsOnLargeQueueConfig.DISABLED
);
}
}
Expand Down Expand Up @@ -654,4 +661,12 @@ public TaskTrackingConfig build() {
}
}

public record HotThreadsOnLargeQueueConfig(int sizeThreshold, long durationThresholdInMillis, long intervalInMillis) {

public static final HotThreadsOnLargeQueueConfig DISABLED = new HotThreadsOnLargeQueueConfig(0, -1, -1);

public boolean isEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use == DISABLED rather than needing to call this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do that, we will need to ensure the DISABLED instance is returned whenever the class is instantiated with sizeThreshold == 0. It is technically possible but would require private access to the class's constructor which means it cannot be a record and the need to add utility method. I am not sure whether this is a better trade-off compare to having this method. Please let me know if you think otherwise. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok we can go with this at least for now.

return sizeThreshold > 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

import static org.elasticsearch.core.Strings.format;
Expand All @@ -28,6 +34,7 @@
public class EsThreadPoolExecutor extends ThreadPoolExecutor {

private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);
private static final long NOT_TRACKED_TIME = -1L;

// noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy
// https://github.com/elastic/elasticsearch/issues/124667
Expand All @@ -45,6 +52,15 @@ public void run() {}
*/
private final String name;

private final EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig;
private final LongSupplier currentTimeMillisSupplier;

// There may be racing on updating this field. It's OK since hot threads logging is very coarse grained time wise
// and can tolerate some inaccuracies.
private volatile long startTimeMillisOfLargeQueue = NOT_TRACKED_TIME;

private final AtomicLong lastLoggingTimeMillisForHotThreads;

EsThreadPoolExecutor(
String name,
int corePoolSize,
Expand All @@ -55,7 +71,45 @@ public void run() {}
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
this(
name,
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
new EsAbortPolicy(),
contextHolder,
EsExecutors.HotThreadsOnLargeQueueConfig.DISABLED
);
}

EsThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
ThreadContext contextHolder,
EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig
) {
this(
name,
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler,
contextHolder,
hotThreadsOnLargeQueueConfig,
System::currentTimeMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the cached time from the threadpool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was indeed the first thing I had in mind. Though it is technically possible. It implies quite a bit cascading changes to the builders, subclasses and utility methods. In addition, executors are built in ThreadPool's constructor so that we would need this-escape to pass the relativeTimeInMillis method. Overall I think it is not really worth the effort especially since we might not want to support it in long term.

);
}

@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
Expand All @@ -68,11 +122,18 @@ public void run() {}
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
ThreadContext contextHolder
ThreadContext contextHolder,
EsExecutors.HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig,
LongSupplier currentTimeMillisSupplier // For test to configure a custom time supplier
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
this.hotThreadsOnLargeQueueConfig = hotThreadsOnLargeQueueConfig;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.lastLoggingTimeMillisForHotThreads = hotThreadsOnLargeQueueConfig.isEnabled()
? new AtomicLong(currentTimeMillisSupplier.getAsLong() - hotThreadsOnLargeQueueConfig.intervalInMillis())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIt: this could also just be zero couldn't it?

Copy link
Member Author

@ywangd ywangd Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Zero effectively means the logging has an initial delay of 60 min (the default interval) on node start. So we can miss the logging if a large queue size occurs within that time frame.

Copy link
Contributor

@nicktindall nicktindall Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it? because now would be millis since epoch, so now - 0 would be much larger than intervalInMillis?

if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I miss understood your suggestion. Yes, it can technically be 0 since we are using absolute timer. I didn't do it because that make test manipulation harder, i.e. the initial time is fixed and cannot controlled by the unit test, hence the choice. I prefer to keep it as is for the time being.

: null;
}

@Override
Expand All @@ -88,6 +149,9 @@ public void setMaximumPoolSize(int maximumPoolSize) {
@Override
public void execute(Runnable command) {
final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;

maybeLogForLargeQueueSize();

try {
super.execute(wrappedRunnable);
} catch (Exception e) {
Expand All @@ -109,6 +173,56 @@ public void execute(Runnable command) {
}
}

private void maybeLogForLargeQueueSize() {
if (hotThreadsOnLargeQueueConfig.isEnabled() == false) {
return;
}

final int queueSize = getQueue().size();
// Use queueSize + 1 so that we start to track when queueSize is 499 and this task is most likely to be queued as well,
// thus reaching the threshold of 500. It won't log right away due to the duration threshold.
if (queueSize + 1 >= hotThreadsOnLargeQueueConfig.sizeThreshold()) {
final long startTime = startTimeMillisOfLargeQueue;
final long now = currentTimeMillisSupplier.getAsLong();
if (startTime == NOT_TRACKED_TIME) {
startTimeMillisOfLargeQueue = now;
return;
}
final long duration = now - startTime;
if (duration >= hotThreadsOnLargeQueueConfig.durationThresholdInMillis()) {
final var lastLoggingTime = lastLoggingTimeMillisForHotThreads.get();
if (now - lastLoggingTime >= hotThreadsOnLargeQueueConfig.intervalInMillis()
&& lastLoggingTimeMillisForHotThreads.compareAndSet(lastLoggingTime, now)) {
logger.info("start logging hot-threads for large queue size [{}] on [{}] executor", queueSize, name);
HotThreads.logLocalHotThreads(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This emits its logs after some delay - might be worth logging a message right now saying we're starting to capture hot threads at this point too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in acd1665

logger,
Level.INFO,
"ThreadPoolExecutor ["
+ name
+ "] queue size ["
+ queueSize
+ "] has been over threshold for ["
+ TimeValue.timeValueMillis(duration)
+ "]",
ReferenceDocs.LOGGING
);
}
}
} else {
startTimeMillisOfLargeQueue = NOT_TRACKED_TIME;
}
}

// package private for testing
EsExecutors.HotThreadsOnLargeQueueConfig getHotThreadsOnLargeQueueConfig() {
return hotThreadsOnLargeQueueConfig;
}

// package private for testing
long getStartTimeMillisOfLargeQueue() {
return startTimeMillisOfLargeQueue;
}

// package-visible for testing
void logException(AbstractRunnable r, Exception e) {
logger.error(() -> format("[%s] unexpected exception when submitting task [%s] for execution", name, r), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.util.concurrent.EsExecutors.HotThreadsOnLargeQueueConfig;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
Expand Down Expand Up @@ -72,9 +73,21 @@ public enum UtilizationTrackingPurpose {
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
ThreadContext contextHolder,
TaskTrackingConfig trackingConfig
TaskTrackingConfig trackingConfig,
HotThreadsOnLargeQueueConfig hotThreadsOnLargeQueueConfig
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);
super(
name,
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler,
contextHolder,
hotThreadsOnLargeQueueConfig
);

this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Map;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING;
import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING;
import static org.elasticsearch.threadpool.ScalingExecutorBuilder.HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA;

Expand All @@ -32,6 +35,9 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
final int halfProcMaxAt10 = ThreadPool.halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = ThreadPool.boundedBy(4 * allocatedProcessors, 128, 512);
final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings);
final int hotThreadsOnLargeQueueSizeThreshold = HOT_THREADS_ON_LARGE_QUEUE_SIZE_THRESHOLD_SETTING.get(settings);
final TimeValue hotThreadsOnLargeQueueDurationThreshold = HOT_THREADS_ON_LARGE_QUEUE_DURATION_THRESHOLD_SETTING.get(settings);
final TimeValue hotThreadsOnLargeQueueInterval = HOT_THREADS_ON_LARGE_QUEUE_INTERVAL_SETTING.get(settings);

Map<String, ExecutorBuilder> result = new HashMap<>();
result.put(
Expand Down Expand Up @@ -115,7 +121,12 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
1,
ThreadPool.boundedBy(allocatedProcessors, 1, 5),
TimeValue.timeValueMinutes(5),
false
false,
new EsExecutors.HotThreadsOnLargeQueueConfig(
hotThreadsOnLargeQueueSizeThreshold,
hotThreadsOnLargeQueueDurationThreshold.millis(),
hotThreadsOnLargeQueueInterval.millis()
)
)
);
result.put(
Expand Down
Loading