Skip to content
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
c753c31
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 6, 2025
7589b39
Merge upstream/main and FeatureRequest18674
Aug 6, 2025
4463d5a
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 7, 2025
e287c6f
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 9, 2025
875221e
[Feature Request] Add support for a ForkJoinPool type - Add changelog…
Aug 10, 2025
ddf3d9d
[Feature Request] Add support for a ForkJoinPool type - Add changelog…
Aug 10, 2025
e8e2e83
Add support for a ForkJoinPool type - spotlessApply
Aug 10, 2025
f96fd0e
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 10, 2025
5df90f6
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 10, 2025
0d508b6
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
30024e3
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
71e254c
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
c73c4d5
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
cb433f8
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
c1fa79c
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
71aaad9
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
c4781ba
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
0fbd1c2
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 11, 2025
0a683f7
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Aug 11, 2025
6b95c1b
Merge the upstream into my main
Aug 13, 2025
e235dd6
Add few more tests to cover the code
Aug 13, 2025
b42af66
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 14, 2025
fd10363
Add few more tests to cover the code
Aug 14, 2025
8b8225a
Backward Compatibility : fallback to FIXED for unknown types
Aug 14, 2025
24cbd53
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 15, 2025
ba4ab4d
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 15, 2025
ffacad4
Backward Compatibility : fallback to FIXED for unknown types
Aug 15, 2025
d73c0be
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 18, 2025
c053c6a
Backward Compatibility : fallback to FIXED for unknown types
Aug 18, 2025
6630cbc
Backward Compatibility : fallback to FIXED for unknown types
Aug 18, 2025
c6914ca
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 21, 2025
0315fb6
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Aug 26, 2025
7b813db
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 2, 2025
7f43e51
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 15, 2025
c9c9995
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 15, 2025
698a95e
Addressed the PR comments
Sep 15, 2025
4877c83
Apply spotlesscheck
Sep 18, 2025
49ac561
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 18, 2025
1b93311
Apply spotlesscheck and merge new changes to my main branch
Sep 18, 2025
009edeb
Apply spotlesscheck and merge new changes to my main branch
Sep 18, 2025
8949ccc
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 20, 2025
d60a5e5
Remove the allowlist and the registration method.
Sep 20, 2025
71f5d1f
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 23, 2025
71322fa
Address the PR comments for the awaitTermination in threadpool
Sep 23, 2025
38377d2
Address the PR comments for the awaitTermination in threadpool
Sep 23, 2025
17a731d
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 23, 2025
45cfd37
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 23, 2025
d3fc1d8
Addressed the PR comments
Sep 23, 2025
f30ed0d
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 23, 2025
5850f98
Address the PR comments
Sep 23, 2025
68098e7
Address the PR comments
Sep 23, 2025
60c699f
Address the PR comments
Sep 23, 2025
d25f0c4
Address the PR comments
Sep 23, 2025
384e750
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 23, 2025
cb89bdd
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 24, 2025
ce6c139
Address the PR comments
Sep 24, 2025
779be1d
Address the PR comments
Sep 24, 2025
4ad3782
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 25, 2025
9e36824
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Sep 25, 2025
5658e9f
Address the PR comments for Yaml tests
Sep 25, 2025
b9c706f
Address the PR comments for Yaml tests
Sep 26, 2025
f4fadbc
Address the PR comments for Yaml tests
Sep 26, 2025
eb7d36c
Address the PR comments for Yaml tests
Sep 26, 2025
d116df7
Address the PR comments for Yaml tests
Sep 26, 2025
cfd900f
Address the PR comments for Yaml tests
Sep 26, 2025
4fba6a7
Address the PR comments for Yaml tests
Sep 26, 2025
1fea0fc
Address the PR comments for Yaml tests
Sep 26, 2025
bfa2030
Address the PR comments for Yaml tests
Sep 26, 2025
a8b75be
Address the PR comments for Yaml tests
Sep 26, 2025
5aeeec4
Address the PR comments for Yaml tests
Sep 26, 2025
244ed19
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 5, 2025
4d647c3
Address code review comments
Oct 5, 2025
6075fe7
Address code review comments
Oct 6, 2025
0736ed4
Address code review comments
Oct 6, 2025
c9a42d3
Address code review comments
Oct 6, 2025
85daa06
Address code review comments
Oct 6, 2025
3e37fc3
Address code review comments
Oct 6, 2025
af44937
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 6, 2025
cd7e77d
Merging upstream main to my main
Oct 9, 2025
f9698e7
Resolve merge conflicts
Oct 9, 2025
8259f2a
Introduce new settings in ForkJoinPoolExecutorBuilder
Oct 9, 2025
2578f2d
overload the constructor and keep the existing version for compatibil…
Oct 9, 2025
0dad905
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 9, 2025
bf2f8d4
Fix the precommit failure issues
Oct 9, 2025
e95f41e
Fix the precommit failure issues
Oct 9, 2025
c56a05d
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 10, 2025
f15d508
Fix the test failures for queue size
Oct 10, 2025
856e7d5
Fix the test failures for queue size
Oct 10, 2025
e069acb
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 14, 2025
47e8b44
Added tests
Oct 14, 2025
251bc9f
Merge branch 'main' into main
srikanthpadakanti Oct 15, 2025
efcafd1
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 15, 2025
2238404
Added tests for the codecoverage target
Oct 15, 2025
6d5db6f
Changed te scope of buildTable and removed reflection
Oct 16, 2025
c9762f5
Changed te scope of buildTable and removed reflection
Oct 16, 2025
9e52fcd
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 16, 2025
5f55cd9
Changed the IT from plugin folder to single node IT with inline plugi…
Oct 16, 2025
1d64dba
Changed the IT from plugin folder to single node IT with inline plugi…
Oct 16, 2025
ab0a165
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 16, 2025
ca3f389
Changed the IT from plugin folder to single node IT with inline plugi…
Oct 16, 2025
f0ef745
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 17, 2025
22a1086
Removed the build.gradle dependency opensearch-common
Oct 17, 2025
0511a4a
Removed the build.gradle dependency opensearch-common
Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Add support for a ForkJoinPool type ([#19008](https://github.com/opensearch-project/OpenSearch/pull/19008))
- Expand fetch phase profiling to support inner hits and top hits aggregation phases ([##18936](https://github.com/opensearch-project/OpenSearch/pull/18936))
- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920))
- Implement Query Rewriting Infrastructure ([#19060](https://github.com/opensearch-project/OpenSearch/pull/19060))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand All @@ -70,6 +72,11 @@ public void tearDownThreadPool() {

public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
// and is tested separately in testExecutionErrorOnForkJoinPool.
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
continue; // skip FORK_JOIN for these tests
}
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(executor));
Expand Down Expand Up @@ -176,6 +183,11 @@ protected void doRun() {

public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
// and is tested separately in testExecutionErrorOnForkJoinPool.
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
continue; // skip FORK_JOIN for these tests
}
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), true);

// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
Expand Down Expand Up @@ -391,4 +403,43 @@ private void runExecutionTest(
}
}

public void testExecutionExceptionOnForkJoinPool() throws InterruptedException {
ForkJoinPool fjp = new ForkJoinPool();
try {
checkExecutionException(getExecuteRunner(fjp), true);
checkExecutionException(getSubmitRunner(fjp), false);
} finally {
fjp.shutdownNow();
fjp.awaitTermination(10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnForkJoinPool() throws Exception {
ForkJoinPool fjp = new ForkJoinPool(8);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> thrown = new AtomicReference<>();
try {
fjp.execute(() -> {
try {
throw new Error("future error");
} catch (Throwable t) {
thrown.set(t);
} finally {
latch.countDown();
}
});

// Wait up to 5 seconds for the task to complete
assertTrue("Timeout waiting for ForkJoinPool task", latch.await(5, TimeUnit.SECONDS));

Throwable error = thrown.get();
assertNotNull("No error captured from ForkJoinPool task", error);
assertTrue(error instanceof Error);
assertEquals("future error", error.getMessage());
} finally {
fjp.shutdownNow();
fjp.awaitTermination(10, TimeUnit.SECONDS);
}
}

}
1 change: 1 addition & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ apply plugin: 'opensearch.rest-resources'

dependencies {
testImplementation project(":client:rest-high-level")
testImplementation project(":libs:opensearch-common")
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"Test cat thread_pool total_wait_time output":
- skip:
version: " - 2.10.99"
reason: thread_pool total_wait_time stats were introduced in V_2.11.0
version: " - 2.10.99,3.2.0 -"
reason: thread_pool total_wait_time stats were introduced in V_2.11.0; ForkJoinPool thread pool support was introduced in 3.2.0 and is tested separately.

- do:
cat.thread_pool: {}
Expand All @@ -27,8 +27,8 @@
---
"Test cat thread_pool total_wait_time output with concurrent search thread_pool":
- skip:
version: " - 2.11.99"
reason: index_search thread_pool was introduced in V_2.12.0
version: " - 2.11.99,3.2.0 -"
reason: index_search thread_pool was introduced in V_2.12.0; ForkJoinPool thread pool support was introduced in 3.2.0 and is tested separately.

- do:
cat.thread_pool: {}
Expand All @@ -54,6 +54,9 @@
---
"Test cat thread_pool output":
- skip:
version: "3.2.0 -,"
reason: ForkJoinPool thread pool support was introduced in 3.2.0 and is tested separately.

- do:
cat.thread_pool: {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
"Test cat thread_pool fork_join output":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: node_name,name,type,active,queue,queue_size,rejected,largest,completed,max,keep_alive
v: true
# At least one row must contain 'fork_join' in name, optionally in type, and 0 for active, -1 for queue_size
- match:
$body: /fork_join.*0.*-1/

---
"Test cat thread_pool fork_join with unrelated pool":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join,search
h: name,type,active,queue,queue_size,rejected
v: true
# At least one fork_join row must exist
- match:
$body: /fork_join.*0.*-1/
# At least one search row, queue_size may be -1 or 1000, type can be blank or 'resizable'
- match:
$body: /search.*0.*(-1|1000)/

---
"Test cat thread_pool fork_join with only fork_join present":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: name,type,active,queue,queue_size,rejected
v: true
- match:
$body: /fork_join.*0.*-1/

---
"Test cat thread_pool fork_join with custom header order":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: queue,active,name,type
v: true
# Accept row with 0 queue, 0 active, fork_join in name, type may appear or not
- match:
$body: /0.*0.*fork_join/

---
"Test cat thread_pool fork_join missing optional columns":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: name,type,keep_alive,queue_size,max
v: true
# At least one fork_join row with -1 in keep_alive
- match:
$body: /fork_join.*-1/

---
"Test cat thread_pool fork_join with unknown header":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: name,type,foo,active
v: true
# At least one fork_join row with 0 in active
- match:
$body: /fork_join.*0/

---
"Test cat thread_pool fork_join with v:false":
- skip:
version: " - 3.1.99"
reason: fork_join thread pool support was introduced in 3.2.0
- do:
cat.thread_pool:
thread_pool_patterns: fork_join
h: name,type,active
v: false
# At least one row with fork_join and 0 in active
- match:
$body: /fork_join.*0/
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ protected Table getTableWithHeader(final RestRequest request) {
return table;
}

// ... [imports and class definition unchanged] ...

private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoResponse nodesInfo, NodesStatsResponse nodesStats) {
final String[] threadPools = req.paramAsStringArray("thread_pool_patterns", new String[] { "*" });
final DiscoveryNodes nodes = state.getState().nodes();
Expand All @@ -196,6 +198,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
}
}

Set<String> forkJoinEmitted = new HashSet<>();
for (final DiscoveryNode node : nodes) {
final NodeInfo info = nodesInfo.getNodesMap().get(node.getId());
final NodeStats stats = nodesStats.getNodesMap().get(node.getId());
Expand Down Expand Up @@ -225,6 +228,44 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR

if (!included.contains(entry.getKey())) continue;

final ThreadPoolStats.Stats poolStats = entry.getValue();
final ThreadPool.Info poolInfo = poolThreadInfo.get(entry.getKey());

boolean isForkJoin = poolInfo != null && poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.FORK_JOIN;
if (isForkJoin) {
// only emit a single row for fork_join for the whole cluster
if (forkJoinEmitted.contains(entry.getKey())) continue;
forkJoinEmitted.add(entry.getKey());

table.startRow();

table.addCell(node.getName());
table.addCell(node.getId());
table.addCell(node.getEphemeralId());
table.addCell(info == null ? null : info.getInfo(ProcessInfo.class).getId());
table.addCell(node.getHostName());
table.addCell(node.getHostAddress());
table.addCell(node.getAddress().address().getPort());
// pool columns
table.addCell(entry.getKey());
table.addCell(poolInfo.getThreadPoolType().getType());
// ForkJoinPool: most stats undefined or 0/-1
table.addCell(0); // active
table.addCell(0); // pool_size
table.addCell(0); // queue
table.addCell(-1); // queue_size
table.addCell(0); // rejected
table.addCell(0); // largest
table.addCell(0); // completed
table.addCell(-1); // total_wait_time
table.addCell(null); // core
table.addCell(null); // max
table.addCell(null); // size
table.addCell(null); // keep_alive
table.endRow();
continue;
}

table.startRow();

table.addCell(node.getName());
Expand All @@ -234,8 +275,6 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
table.addCell(node.getHostName());
table.addCell(node.getHostAddress());
table.addCell(node.getAddress().address().getPort());
final ThreadPoolStats.Stats poolStats = entry.getValue();
final ThreadPool.Info poolInfo = poolThreadInfo.get(entry.getKey());

Long maxQueueSize = null;
String keepAlive = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.threadpool;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.node.Node;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;

/**
* A builder for fork join executors.
*
* @opensearch.internal
*/
public final class ForkJoinPoolExecutorBuilder extends ExecutorBuilder<ForkJoinPoolExecutorBuilder.ForkJoinPoolExecutorSettings> {

private final Setting<Integer> parallelismSetting;

public ForkJoinPoolExecutorBuilder(final String name, final int parallelism) {
this(name, parallelism, "thread_pool." + name);
}

public ForkJoinPoolExecutorBuilder(final String name, final int parallelism, final String prefix) {
super(name);
this.parallelismSetting = Setting.intSetting(settingsKey(prefix, "parallelism"), parallelism, Setting.Property.NodeScope);
}

@Override
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(parallelismSetting);
}

@Override
ForkJoinPoolExecutorSettings getSettings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int parallelism = parallelismSetting.get(settings);
return new ForkJoinPoolExecutorSettings(nodeName, parallelism);
}

@Override
ThreadPool.ExecutorHolder build(final ForkJoinPoolExecutorSettings settings, final ThreadContext threadContext) {
int parallelism = settings.parallelism;
ForkJoinWorkerThreadFactory factory = pool -> {
ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName(OpenSearchExecutors.threadName(settings.nodeName, name()));
return worker;
};
final ForkJoinPool executor = new ForkJoinPool(parallelism, factory, null, false);

final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FORK_JOIN, parallelism, parallelism, null, null);
return new ThreadPool.ExecutorHolder(executor, info);
}

@Override
String formatInfo(ThreadPool.Info info) {
return String.format(Locale.ROOT, "name [%s], parallelism [%d]", info.getName(), info.getMax());
}

static class ForkJoinPoolExecutorSettings extends ExecutorBuilder.ExecutorSettings {
private final int parallelism;

ForkJoinPoolExecutorSettings(final String nodeName, final int parallelism) {
super(nodeName);
this.parallelism = parallelism;
}
}
}
Loading
Loading