Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/142746.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: Search
issues: []
pr: 142746
summary: Add search task watchdog to log hot threads on slow search
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,57 @@ The following search settings are supported:
* `search.aggs.rewrite_to_filter_by_filter`


## Search task watchdog settings [search-task-watchdog-settings]
```{applies_to}
stack: ga 9.4
```

The search task watchdog monitors long-running search tasks and logs hot threads when thresholds are exceeded.
This helps diagnose slow searches by capturing threads activity while the search is still running, rather
than just logging after completion.

On [data nodes](docs-content://deploy-manage/distributed-architecture/clusters-nodes-shards/node-roles.md#data-node-role),
the watchdog logs hot threads when a shard-level search operation (query/fetch phase) exceeds the
data node threshold. On [coordinator nodes](docs-content://deploy-manage/distributed-architecture/clusters-nodes-shards/node-roles.md#coordinating-only-node-role),
it logs hot threads for long-running coordinator search tasks only when they have no outstanding
shard child requests. This avoids redundant logging when the coordinator is simply waiting for
slow shards, which log their own hot threads.

The hot threads output is gzip compressed and base64-encoded. To decode it, use:

```sh
echo "<base64-data>" | base64 --decode | gzip --decompress
```

If the output is split across multiple log lines, concatenate them first.

$$$search-task-watchdog-enabled$$$

`search.task_watchdog.enabled`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting), boolean) Enables or disables the search task watchdog. Defaults to `false`.

$$$search-task-watchdog-coordinator-threshold$$$

`search.task_watchdog.coordinator_threshold`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting), [time value](/reference/elasticsearch/rest-apis/api-conventions.md#time-units)) Threshold for coordinator tasks. When a search task on the coordinator node exceeds
this duration and has no outstanding shard child requests, hot threads are logged. Set to `-1ms`
to disable coordinator task monitoring.
Defaults to `3s`.
Comment thread
spinscale marked this conversation as resolved.

$$$search-task-watchdog-data-node-threshold$$$

`search.task_watchdog.data_node_threshold`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting), [time value](/reference/elasticsearch/rest-apis/api-conventions.md#time-units)) Threshold for data node shard tasks. When a shard-level search operation (query or fetch phase) exceeds this duration, hot threads are logged.
Set to `-1ms` to disable data node task monitoring. Defaults to `3s`.

$$$search-task-watchdog-interval$$$

`search.task_watchdog.interval`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting), [time value](/reference/elasticsearch/rest-apis/api-conventions.md#time-units)) How frequently the watchdog checks for slow tasks. Lower values detect slow tasks sooner but consume more resources. Minimum value is `100ms`. Defaults to `1s`.

$$$search-task-watchdog-cooldown-period$$$

`search.task_watchdog.cooldown_period`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting), [time value](/reference/elasticsearch/rest-apis/api-conventions.md#time-units)) Minimum time between hot threads logging on this node. This prevents flooding the logs when many tasks are slow simultaneously. Defaults to `30s`.


Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;

/**
* Monitors search tasks for slow execution and logs hot threads when thresholds are exceeded.
* <p>
* On data nodes, logs hot threads when a shard-level search operation (query/fetch phase) exceeds
* the data node threshold. On coordinator nodes, logs hot threads when the reduce/merge phase
* exceeds the coordinator threshold (only after all shards have responded).
* <p>
* This helps diagnose slow searches by capturing what threads are doing while the search is
* still running, rather than just logging after completion.
*/
public class SearchTaskWatchdog extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(SearchTaskWatchdog.class);

public static final Setting<Boolean> ENABLED = Setting.boolSetting(
"search.task_watchdog.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> COORDINATOR_THRESHOLD = Setting.timeSetting(
"search.task_watchdog.coordinator_threshold",
TimeValue.timeValueSeconds(3),
TimeValue.MINUS_ONE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> DATA_NODE_THRESHOLD = Setting.timeSetting(
"search.task_watchdog.data_node_threshold",
TimeValue.timeValueSeconds(3),
TimeValue.MINUS_ONE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> INTERVAL = Setting.timeSetting(
"search.task_watchdog.interval",
TimeValue.timeValueSeconds(1),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"search.task_watchdog.cooldown_period",
TimeValue.timeValueSeconds(30),
TimeValue.ZERO,
Comment thread
spinscale marked this conversation as resolved.
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final TaskManager taskManager;
private final ThreadPool threadPool;

private volatile boolean enabled;
private volatile long coordinatorThresholdNanos;
private volatile long dataNodeThresholdNanos;
private volatile long minThresholdNanos;
private volatile TimeValue interval;
private volatile long cooldownPeriodNanos;
private volatile long lastLoggedNanos = 0;
private final AtomicBoolean scheduled = new AtomicBoolean(false);

public SearchTaskWatchdog(Settings settings, ClusterSettings clusterSettings, TaskManager taskManager, ThreadPool threadPool) {
this.taskManager = taskManager;
this.threadPool = threadPool;

this.enabled = ENABLED.get(settings);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't all of these called with the settings update consumer anyway, so no need to call twice?

@andreidan andreidan Feb 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. What are we calling twice?
We initialize enabled here and then subscribe to changes a bit later in the cosntructor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps using intializeAndWatch is what you meant here? d1a0d29

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

this.coordinatorThresholdNanos = COORDINATOR_THRESHOLD.get(settings).nanos();
this.dataNodeThresholdNanos = DATA_NODE_THRESHOLD.get(settings).nanos();
this.minThresholdNanos = computeMinThreshold(coordinatorThresholdNanos, dataNodeThresholdNanos);
this.interval = INTERVAL.get(settings);
this.cooldownPeriodNanos = COOLDOWN_PERIOD.get(settings).nanos();

clusterSettings.addSettingsUpdateConsumer(ENABLED, this::setEnabled);
clusterSettings.addSettingsUpdateConsumer(COORDINATOR_THRESHOLD, v -> setCoordinatorThreshold(v.nanos()));
clusterSettings.addSettingsUpdateConsumer(DATA_NODE_THRESHOLD, v -> setDataNodeThreshold(v.nanos()));
clusterSettings.addSettingsUpdateConsumer(INTERVAL, v -> this.interval = v);
clusterSettings.addSettingsUpdateConsumer(COOLDOWN_PERIOD, v -> this.cooldownPeriodNanos = v.nanos());
}

private void setEnabled(boolean enabled) {
this.enabled = enabled;
if (enabled && lifecycle.started()) {
scheduleNext();
}
}

private void setCoordinatorThreshold(long newCoordinatorThresholdValue) {
this.coordinatorThresholdNanos = newCoordinatorThresholdValue;
this.minThresholdNanos = computeMinThreshold(newCoordinatorThresholdValue, dataNodeThresholdNanos);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pass variables, as you set this.coordinatorThresholdNanos here already and this.dataNodeThresholdNanos you can just use these in computeMinThreshold()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more readable with parameters (i.e. it conveys what it does without having to step inside the method)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, for me it was the opposite. As we're essentially testing this.coordinatorThresholdNanos and this.dataNodeThresholdNanos, but passing different variable names, I considered it harder to read. The name conveys to me what it does.

I don't have a strong preference though.

}

private void setDataNodeThreshold(long newDataNodeThresholdValue) {
this.dataNodeThresholdNanos = newDataNodeThresholdValue;
this.minThresholdNanos = computeMinThreshold(coordinatorThresholdNanos, newDataNodeThresholdValue);
}

private static long computeMinThreshold(long coordinatorNanos, long dataNodeNanos) {
long coordValue = coordinatorNanos > 0 ? coordinatorNanos : Long.MAX_VALUE;
long dataValue = dataNodeNanos > 0 ? dataNodeNanos : Long.MAX_VALUE;
return Math.min(coordValue, dataValue);
}

@Override
protected void doStart() {
if (enabled) {
scheduleNext();
}
}

@Override
protected void doStop() {}

@Override
protected void doClose() {}

private void scheduleNext() {
if (enabled && lifecycle.stoppedOrClosed() == false && scheduled.compareAndSet(false, true)) {
threadPool.scheduleUnlessShuttingDown(interval, threadPool.generic(), this::run);
}
}

private void run() {
try {
if (enabled == false || lifecycle.stoppedOrClosed()) {
return;
}

final long now = threadPool.relativeTimeInNanos();

// skip task iteration if in cooldown period
Comment thread
andreidan marked this conversation as resolved.
Outdated
if (lastLoggedNanos > 0 && (now - lastLoggedNanos) < cooldownPeriodNanos) {
return;
}

if (minThresholdNanos < Long.MAX_VALUE) {
taskManager.forEachCancellableTask(minThresholdNanos, info -> {
try {
handleTask(info, now);
} catch (Exception e) {
logger.debug(() -> "error processing task [" + info.task().getId() + "]", e);
}
// we logged a slow task in this iteration, so skip checking other tasks
// as we're now in the cooldown period
return lastLoggedNanos != now;
});
}
} finally {
scheduled.set(false);
scheduleNext();
}
}

private void handleTask(TaskManager.CancellableTaskInfo info, long now) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC);
if (info.task() instanceof SearchShardTask) {
if (dataNodeThresholdNanos > 0 && info.elapsedNanos() > dataNodeThresholdNanos) {
Comment thread
spinscale marked this conversation as resolved.
logSlowTask(info, "shard", now);
}
} else if (info.task() instanceof SearchTask) {
if (coordinatorThresholdNanos > 0
&& info.elapsedNanos() > coordinatorThresholdNanos
&& info.hasOutstandingChildren() == false) {
logSlowTask(info, "coordinator", now);
}
}
}

private void logSlowTask(TaskManager.CancellableTaskInfo info, String type, long now) {
lastLoggedNanos = now;

long taskId = info.task().getId();
HotThreads.logLocalHotThreads(
logger,
Level.INFO,
format("slow search %s task [%d] parent [%s]", type, taskId, info.task().getParentTaskId()),
ReferenceDocs.SEARCH_TASK_WATCHDOG
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public enum ReferenceDocs {
DEPLOY_CLOUD_DIFF_FROM_STATEFUL,
DELETE_INDEX_BLOCK,
ARCHIVED_SETTINGS,
MACHINE_LEARNING_SETTINGS
MACHINE_LEARNING_SETTINGS,
SEARCH_TASK_WATCHDOG
// this comment keeps the ';' on the next line so every entry above has a trailing ',' which makes the diff for adding new links cleaner
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.ingest.SimulatePipelineTransportAction;
import org.elasticsearch.action.search.SearchLogProducer;
import org.elasticsearch.action.search.SearchTaskWatchdog;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -460,6 +461,11 @@ public void apply(Settings value, Settings current, Settings previous) {
NetworkService.TCP_RECEIVE_BUFFER_SIZE,
ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL,
ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME,
SearchTaskWatchdog.ENABLED,
SearchTaskWatchdog.COORDINATOR_THRESHOLD,
SearchTaskWatchdog.DATA_NODE_THRESHOLD,
SearchTaskWatchdog.INTERVAL,
SearchTaskWatchdog.COOLDOWN_PERIOD,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.search.SearchTaskWatchdog;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -285,6 +286,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(SearchTaskWatchdog.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodeMetrics.class).start();
injector.getInstance(IndicesMetrics.class).start();
Expand Down Expand Up @@ -484,6 +486,7 @@ private void stop() {
stopIfStarted(nodeService.getMonitorService());
stopIfStarted(GatewayService.class);
stopIfStarted(SearchService.class);
stopIfStarted(SearchTaskWatchdog.class);
stopIfStarted(TransportService.class);
stopIfStarted(NodeMetrics.class);
stopIfStarted(IndicesMetrics.class);
Expand Down Expand Up @@ -553,6 +556,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
toClose.add(injector.getInstance(SearchService.class));
toClose.add(injector.getInstance(SearchTaskWatchdog.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(injector.getInstance(NodeMetrics.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchTaskWatchdog;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.UpdateHelper;
Expand Down Expand Up @@ -1283,6 +1284,13 @@ public <T extends TransportResponse> void sendRequest(
onlinePrewarmingService
);

final SearchTaskWatchdog searchTaskWatchdog = new SearchTaskWatchdog(
settings,
settingsModule.getClusterSettings(),
transportService.getTaskManager(),
threadPool
);

final var shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, transportService, terminationHandler);

modules.add(loadPersistentTasksService(settingsModule, clusterService, threadPool, clusterModule.getIndexNameExpressionResolver()));
Expand Down Expand Up @@ -1355,6 +1363,7 @@ public <T extends TransportResponse> void sendRequest(
b.bind(MetadataUpdateSettingsService.class).toInstance(metadataUpdateSettingsService);
b.bind(MetadataIndexTemplateService.class).toInstance(metadataIndexTemplateService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTaskWatchdog.class).toInstance(searchTaskWatchdog);
b.bind(SearchResponseMetrics.class).toInstance(searchResponseMetrics);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));
Expand Down
Loading
Loading