Skip to content

Commit 7ef43a2

Browse files
committed
limit the max size of bulk and index thread pools to bounded number of processors
1 parent 445be98 commit 7ef43a2

File tree

2 files changed

+62
-2
lines changed

2 files changed

+62
-2
lines changed

core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
458458
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
459459
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
460460
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
461-
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
461+
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
462462
if (previousInfo.getMax() != updatedSize) {
463463
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
464464
// if you think this code is crazy: that's because it is!
@@ -480,7 +480,7 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
480480
defaultQueueSize = previousInfo.getQueueSize();
481481
}
482482

483-
int size = settings.getAsInt("size", defaultSize);
483+
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
484484
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
485485
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
486486
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
@@ -533,6 +533,20 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde
533533
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
534534
}
535535

536+
private int applyHardSizeLimit(String name, int requestedSize) {
537+
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
538+
if (name.equals(Names.BULK) || name.equals(Names.INDEX)) {
539+
// We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means
540+
// too many segments written, too frequently, too much merging, etc:
541+
542+
// TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous
543+
// because on upgrade this setting could be in cluster state and hard for the user to correct?
544+
return Math.min(requestedSize, availableProcessors);
545+
} else {
546+
return requestedSize;
547+
}
548+
}
549+
536550
private void updateSettings(Settings settings) {
537551
Map<String, Settings> groupSettings = settings.getAsGroups();
538552
if (groupSettings.isEmpty()) {

core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.settings.ClusterSettings;
2323
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2425
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2526
import org.elasticsearch.test.ESTestCase;
2627
import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -89,6 +90,51 @@ public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedExcep
8990
}
9091
}
9192

93+
public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
94+
String threadPoolName = randomThreadPoolName();
95+
for (String name : new String[] {ThreadPool.Names.BULK, ThreadPool.Names.INDEX}) {
96+
ThreadPool threadPool = null;
97+
try {
98+
99+
int maxSize = EsExecutors.boundedNumberOfProcessors(Settings.EMPTY);
100+
101+
// try to create a too-big (maxSize+1) thread pool
102+
threadPool = new ThreadPool(settingsBuilder()
103+
.put("name", "testIndexingThreadPoolsMaxSize")
104+
.put("threadpool." + name + ".size", maxSize+1)
105+
.build());
106+
107+
// confirm it clipped us at the maxSize:
108+
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
109+
110+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
111+
threadPool.setClusterSettings(clusterSettings);
112+
113+
// update it to a tiny size:
114+
clusterSettings.applySettings(
115+
settingsBuilder()
116+
.put("threadpool." + name + ".size", 1)
117+
.build()
118+
);
119+
120+
// confirm it worked:
121+
assertEquals(1, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
122+
123+
// try to update to too-big size:
124+
clusterSettings.applySettings(
125+
settingsBuilder()
126+
.put("threadpool." + name + ".size", maxSize+1)
127+
.build()
128+
);
129+
130+
// confirm it clipped us at the maxSize:
131+
assertEquals(maxSize, ((ThreadPoolExecutor) threadPool.executor(name)).getMaximumPoolSize());
132+
} finally {
133+
terminateThreadPoolIfNeeded(threadPool);
134+
}
135+
}
136+
}
137+
92138
public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
93139
String threadPoolName = randomThreadPoolName();
94140
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);

0 commit comments

Comments
 (0)