Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/migration/migrate_8_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ include::migrate_8_0/http.asciidoc[]
include::migrate_8_0/reindex.asciidoc[]
include::migrate_8_0/search.asciidoc[]
include::migrate_8_0/settings.asciidoc[]
include::migrate_8_0/threadpool.asciidoc[]
include::migrate_8_0/indices.asciidoc[]
include::migrate_8_0/api.asciidoc[]
10 changes: 10 additions & 0 deletions docs/reference/migration/migrate_8_0/threadpool.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[float]
[[breaking_80_threadpool_changes]]
=== Thread pool changes

[float]
==== Removal of the `fixed_auto_queue_size` thread pool type

The `fixed_auto_queue_size` thread pool type, previously marked as an
experimental feature, was deprecated in 7.x and has been removed in 8.0.
The `search` and `search_throttled` thread pools have the `fixed` type now.
53 changes: 3 additions & 50 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ There are several thread pools, but the important ones include:

`search`::
For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
`fixed` with a size of
`int((# of available_processors * 3) / 2) + 1`, and queue_size of
`1000`.

[[search-throttled]]`search_throttled`::
For count/search/suggest/get operations on `search_throttled indices`.
Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial
queue_size of `100`.
Thread pool type is `fixed` with a size of `1`, and queue_size of `100`.

`get`::
For get operations. Thread pool type is `fixed`
Expand Down Expand Up @@ -119,52 +118,6 @@ thread_pool:
queue_size: 1000
--------------------------------------------------

[float]
[[fixed-auto-queue-size]]
==== `fixed_auto_queue_size`

experimental[]

The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle
the requests with a bounded queue for pending requests that have no threads to
service them. It's similar to the `fixed` threadpool, however, the `queue_size`
automatically adjusts according to calculations based on
https://en.wikipedia.org/wiki/Little%27s_law[Little's Law]. These calculations
will potentially adjust the `queue_size` up or down by 50 every time
`auto_queue_frame_size` operations have been completed.

The `size` parameter controls the number of threads.

The `queue_size` allows to control the initial size of the queue of pending
requests that have no threads to execute them.

The `min_queue_size` setting controls the minimum amount the `queue_size` can be
adjusted to.

The `max_queue_size` setting controls the maximum amount the `queue_size` can be
adjusted to.

The `auto_queue_frame_size` setting controls the number of operations during
which measurement is taken before the queue is adjusted. It should be large
enough that a single operation cannot unduly bias the calculation.

The `target_response_time` is a time value setting that indicates the targeted
average response time for tasks in the thread pool queue. If tasks are routinely
above this time, the thread pool queue will be adjusted down so that tasks are
rejected.

[source,yaml]
--------------------------------------------------
thread_pool:
search:
size: 30
queue_size: 500
min_queue_size: 10
max_queue_size: 1000
auto_queue_frame_size: 2000
target_response_time: 1s
--------------------------------------------------

[float]
[[scaling]]
==== `scaling`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,6 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted
}
}

public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(autoQueueFixedExecutor));
checkExecutionError(getSubmitRunner(autoQueueFixedExecutor));
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
Expand Down Expand Up @@ -200,18 +189,6 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
}
}

public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), true);
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;

import java.util.Arrays;
Expand Down Expand Up @@ -94,29 +93,6 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa
queue, threadFactory, new EsAbortPolicy(), contextHolder);
}

/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
ThreadFactory threadFactory, ThreadContext contextHolder) {
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
initialQueueCapacity);
}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
}

/**
* Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown
* during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated
Expand Down
Loading