Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -46,12 +48,27 @@

public class EsExecutors {

private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(EsExecutors.class));

/**
* Settings key to manually set the number of available processors.
* This is used to adjust thread pools sizes etc. per node.
* Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
*/
public static final Setting<Integer> PROCESSORS_SETTING =
Setting.intSetting("processors", Runtime.getRuntime().availableProcessors(), 1, Property.NodeScope);
public static final Setting<Integer> PROCESSORS_SETTING = new Setting<>(
"processors",
s -> Integer.toString(Runtime.getRuntime().availableProcessors()),
s -> {
final int value = Setting.parseInt(s, 1, "processors");
final int availableProcessors = Runtime.getRuntime().availableProcessors();
if (value > availableProcessors) {
deprecationLogger.deprecatedAndMaybeLog(
"processors",
"setting processors to value [{}] which is more than available processors [{}] is deprecated",
value,
availableProcessors);
}
return value;
},
Property.NodeScope);

/**
* Returns the number of available processors. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -450,7 +451,7 @@ private static Settings getRandomNodeSettings(long seed) {
builder.put(SearchService.DEFAULT_KEEPALIVE_SETTING.getKey(), timeValueSeconds(100 + random.nextInt(5 * 60)).getStringRep());
}

builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(3));
builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
if (random.nextBoolean()) {
if (random.nextBoolean()) {
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
* @param settings The current settings
* @return A number between 5 and the number of processors
*/
static int getWatcherThreadPoolSize(Settings settings) {
boolean isDataNode = Node.NODE_DATA_SETTING.get(settings);
static int getWatcherThreadPoolSize(final Settings settings) {
return getWatcherThreadPoolSize(Node.NODE_DATA_SETTING.get(settings), EsExecutors.numberOfProcessors(settings));
}

static int getWatcherThreadPoolSize(final boolean isDataNode, final int numberOfProcessors) {
if (isDataNode) {
int numberOfProcessors = EsExecutors.numberOfProcessors(settings);
long size = Math.max(Math.min(5 * numberOfProcessors, 50), numberOfProcessors);
final long size = Math.max(Math.min(5 * numberOfProcessors, 50), numberOfProcessors);
return Math.toIntExact(size);
} else {
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,19 @@ public void testWatcherDisabledTests() throws Exception {

public void testThreadPoolSize() {
// old calculation was 5 * number of processors
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 1).build()), is(5));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 2).build()), is(10));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 4).build()), is(20));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 8).build()), is(40));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 9).build()), is(45));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 10).build()), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 16).build()), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 24).build()), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 50).build()), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 51).build()), is(51));
assertThat(Watcher.getWatcherThreadPoolSize(Settings.builder().put("processors", 96).build()), is(96));

Settings noDataNodeSettings = Settings.builder()
.put("processors", scaledRandomIntBetween(1, 100))
.put("node.data", false)
.build();
assertThat(Watcher.getWatcherThreadPoolSize(noDataNodeSettings), is(1));
assertThat(Watcher.getWatcherThreadPoolSize(true, 1), is(5));
assertThat(Watcher.getWatcherThreadPoolSize(true, 2), is(10));
assertThat(Watcher.getWatcherThreadPoolSize(true, 4), is(20));
assertThat(Watcher.getWatcherThreadPoolSize(true, 8), is(40));
assertThat(Watcher.getWatcherThreadPoolSize(true, 9), is(45));
assertThat(Watcher.getWatcherThreadPoolSize(true, 10), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(true, 16), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(true, 24), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(true, 50), is(50));
assertThat(Watcher.getWatcherThreadPoolSize(true, 51), is(51));
assertThat(Watcher.getWatcherThreadPoolSize(true, 96), is(96));

assertThat(Watcher.getWatcherThreadPoolSize(false, scaledRandomIntBetween(1, 100)), is(1));
}

public void testReload() {
Expand Down