Skip to content

Commit f96fd0e

Browse files
author
Srikanth Padakanti
committed
Initial chekpoint Feature Request - Add support for a ForkJoinPool type
Signed-off-by: Srikanth Padakanti <[email protected]>
1 parent e8e2e83 commit f96fd0e

File tree

4 files changed

+177
-16
lines changed

4 files changed

+177
-16
lines changed

qa/evil-tests/src/test/java/org/opensearch/threadpool/EvilThreadPoolTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.concurrent.ExecutorService;
4848
import java.util.concurrent.ScheduledThreadPoolExecutor;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.ForkJoinPool;
51+
5052
import java.util.concurrent.atomic.AtomicReference;
5153
import java.util.function.Consumer;
5254

@@ -70,6 +72,11 @@ public void tearDownThreadPool() {
7072

7173
public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
7274
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
75+
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
76+
// and is tested separately in testExecutionErrorOnForkJoinPool.
77+
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
78+
continue; // skip FORK_JOIN for these tests
79+
}
7380
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
7481
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
7582
checkExecutionError(getScheduleRunner(executor));
@@ -176,6 +183,11 @@ protected void doRun() {
176183

177184
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
178185
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
186+
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
187+
// and is tested separately in testExecutionErrorOnForkJoinPool.
188+
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
189+
continue; // skip FORK_JOIN for these tests
190+
}
179191
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), true);
180192

181193
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
@@ -391,4 +403,43 @@ private void runExecutionTest(
391403
}
392404
}
393405

406+
public void testExecutionExceptionOnForkJoinPool() throws InterruptedException {
407+
ForkJoinPool fjp = new ForkJoinPool();
408+
try {
409+
checkExecutionException(getExecuteRunner(fjp), true);
410+
checkExecutionException(getSubmitRunner(fjp), false);
411+
} finally {
412+
fjp.shutdownNow();
413+
fjp.awaitTermination(10, TimeUnit.SECONDS);
414+
}
415+
}
416+
417+
public void testExecutionErrorOnForkJoinPool() throws Exception {
418+
ForkJoinPool fjp = new ForkJoinPool(8);
419+
final CountDownLatch latch = new CountDownLatch(1);
420+
final AtomicReference<Throwable> thrown = new AtomicReference<>();
421+
try {
422+
fjp.execute(() -> {
423+
try {
424+
throw new Error("future error");
425+
} catch (Throwable t) {
426+
thrown.set(t);
427+
} finally {
428+
latch.countDown();
429+
}
430+
});
431+
432+
// Wait up to 5 seconds for the task to complete
433+
assertTrue("Timeout waiting for ForkJoinPool task", latch.await(5, TimeUnit.SECONDS));
434+
435+
Throwable error = thrown.get();
436+
assertNotNull("No error captured from ForkJoinPool task", error);
437+
assertTrue(error instanceof Error);
438+
assertEquals("future error", error.getMessage());
439+
} finally {
440+
fjp.shutdownNow();
441+
fjp.awaitTermination(10, TimeUnit.SECONDS);
442+
}
443+
}
444+
394445
}

rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"Test cat thread_pool total_wait_time output":
22
- skip:
3-
version: " - 2.10.99"
4-
reason: thread_pool total_wait_time stats were introduced in V_2.11.0
3+
version: " - 2.10.99,3.2.0-"
4+
reason: thread_pool total_wait_time stats were introduced in V_2.11.0; fork_join thread pool not present before 3.2.0
55

66
- do:
77
cat.thread_pool: {}
@@ -129,3 +129,32 @@
129129
/ #node_name name active queue rejected
130130
^ (\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n
131131
\S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
132+
133+
# NEW TEST for 3.2.0 and later (expect fork_join)
134+
---
135+
"Test cat thread_pool total_wait_time output (with fork_join)":
136+
- skip:
137+
version: " - 3.1.99"
138+
reason: fork_join thread pool introduced in 3.2.0
139+
140+
- do:
141+
cat.thread_pool: {}
142+
143+
- match:
144+
$body: |
145+
/ #node_name name active queue rejected
146+
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
147+
148+
- do:
149+
cat.thread_pool:
150+
thread_pool_patterns: search,search_throttled,generic,fork_join
151+
h: name,total_wait_time,twt
152+
v: true
153+
154+
- match:
155+
$body: |
156+
/^ name \s+ total_wait_time \s+ twt \n
157+
(generic \s+ -1 \s+ -1 \n
158+
fork_join \s+ -1 \s+ -1 \n
159+
search \s+ \d*\.*\d*\D+ \s+ \d*\.*\d*\D+ \n
160+
search_throttled \s+ \d*\.*\d*\D+ \s+ \d*\.*\d*\D+ \n)+ $/

server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,16 +262,45 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
262262
}
263263
}
264264

265+
// --- BEGIN CHANGE: Normalize ForkJoinPool stats to 0 ---
266+
boolean isForkJoin = poolInfo != null && poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.FORK_JOIN;
267+
268+
int active = (poolStats == null ? 0 : poolStats.getActive());
269+
int threads = (poolStats == null ? 0 : poolStats.getThreads());
270+
int queue = (poolStats == null ? 0 : poolStats.getQueue());
271+
long rejected = (poolStats == null ? 0 : poolStats.getRejected());
272+
int largest = (poolStats == null ? 0 : poolStats.getLargest());
273+
long completed = (poolStats == null ? 0 : poolStats.getCompleted());
274+
long waitTime = (poolStats == null ? 0 : poolStats.getWaitTime().nanos());
275+
276+
// Normalize ForkJoinPool stats to 0 or -1 as per convention
277+
if (isForkJoin) {
278+
active = 0;
279+
threads = 0;
280+
queue = 0;
281+
rejected = 0;
282+
largest = 0;
283+
completed = 0;
284+
waitTime = -1; // if wait time is unsupported
285+
// Set maxQueueSize to -1 if not supported for ForkJoinPool
286+
maxQueueSize = -1L;
287+
core = null;
288+
max = null;
289+
size = null;
290+
keepAlive = null;
291+
}
292+
// --- END CHANGE ---
293+
265294
table.addCell(entry.getKey());
266295
table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType().getType());
267-
table.addCell(poolStats == null ? null : poolStats.getActive());
268-
table.addCell(poolStats == null ? null : poolStats.getThreads());
269-
table.addCell(poolStats == null ? null : poolStats.getQueue());
296+
table.addCell(active);
297+
table.addCell(threads);
298+
table.addCell(queue);
270299
table.addCell(maxQueueSize == null ? -1 : maxQueueSize);
271-
table.addCell(poolStats == null ? null : poolStats.getRejected());
272-
table.addCell(poolStats == null ? null : poolStats.getLargest());
273-
table.addCell(poolStats == null ? null : poolStats.getCompleted());
274-
table.addCell(poolStats == null ? null : poolStats.getWaitTime());
300+
table.addCell(rejected);
301+
table.addCell(largest);
302+
table.addCell(completed);
303+
table.addCell(waitTime);
275304
table.addCell(core);
276305
table.addCell(max);
277306
table.addCell(size);

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ private void validateSetting(Settings tpSettings) {
469469
}
470470
Settings tpGroup = entry.getValue();
471471
ExecutorHolder holder = executors.get(tpName);
472+
// Skip validation for ForkJoinPool type since it does not support these settings
473+
if (holder.info.type == ThreadPoolType.FORK_JOIN) {
474+
continue;
475+
}
472476
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
473477
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor;
474478
if (holder.info.type == ThreadPoolType.SCALING) {
@@ -504,6 +508,9 @@ public void setThreadPool(Settings tpSettings) {
504508
String tpName = entry.getKey();
505509
Settings tpGroup = entry.getValue();
506510
ExecutorHolder holder = executors.get(tpName);
511+
if (holder.info.type == ThreadPoolType.FORK_JOIN) {
512+
continue;
513+
}
507514
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
508515
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) holder.executor;
509516
if (holder.info.type == ThreadPoolType.SCALING) {
@@ -543,6 +550,11 @@ public ThreadPoolStats stats() {
543550
if ("same".equals(name)) {
544551
continue;
545552
}
553+
if (holder.info.type == ThreadPoolType.FORK_JOIN) {
554+
// Add ForkJoinPool with sentinel values
555+
stats.add(new ThreadPoolStats.Stats(name, 0, 0, 0, 0, 0, 0, -1));
556+
continue;
557+
}
546558
int threads = -1;
547559
int queue = -1;
548560
int active = -1;
@@ -664,8 +676,11 @@ public void shutdown() {
664676
stopCachedTimeThread();
665677
scheduler.shutdown();
666678
for (ExecutorHolder executor : executors.values()) {
667-
if (executor.executor() instanceof ThreadPoolExecutor || executor.executor() instanceof ForkJoinPool) {
668-
executor.executor().shutdown();
679+
ExecutorService es = executor.executor();
680+
if (es instanceof ThreadPoolExecutor) {
681+
es.shutdown();
682+
} else if (es instanceof ForkJoinPool) {
683+
es.shutdown();
669684
}
670685
}
671686
}
@@ -674,23 +689,50 @@ public void shutdownNow() {
674689
stopCachedTimeThread();
675690
scheduler.shutdownNow();
676691
for (ExecutorHolder executor : executors.values()) {
677-
if (executor.executor() instanceof ThreadPoolExecutor || executor.executor() instanceof ForkJoinPool) {
678-
executor.executor().shutdownNow();
692+
ExecutorService es = executor.executor();
693+
if (es instanceof ThreadPoolExecutor) {
694+
es.shutdownNow();
695+
} else if (es instanceof ForkJoinPool) {
696+
es.shutdownNow(); // same as shutdown(), but explicit to the reader
679697
}
680698
}
681699
}
682700

683701
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
684-
boolean result = scheduler.awaitTermination(timeout, unit);
702+
long nanos = unit.toNanos(timeout);
703+
long deadline = System.nanoTime() + nanos;
704+
boolean result = scheduler.awaitTermination(Math.max(0, nanos), TimeUnit.NANOSECONDS);
705+
long now = System.nanoTime();
706+
nanos = deadline - now;
707+
685708
for (ExecutorHolder executor : executors.values()) {
686709
if (executor.executor() instanceof ThreadPoolExecutor || executor.executor() instanceof ForkJoinPool) {
687-
result &= executor.executor().awaitTermination(timeout, unit);
710+
if (nanos <= 0) {
711+
result = false;
712+
break;
713+
}
714+
result &= executor.executor().awaitTermination(Math.max(0, nanos), TimeUnit.NANOSECONDS);
715+
now = System.nanoTime();
716+
nanos = deadline - now;
688717
}
689718
}
690-
cachedTimeThread.join(unit.toMillis(timeout));
719+
if (nanos > 0) {
720+
cachedTimeThread.join(TimeUnit.NANOSECONDS.toMillis(nanos));
721+
}
691722
return result;
692723
}
693724

725+
// public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
726+
// boolean result = scheduler.awaitTermination(timeout, unit);
727+
// for (ExecutorHolder executor : executors.values()) {
728+
// if (executor.executor() instanceof ThreadPoolExecutor || executor.executor() instanceof ForkJoinPool) {
729+
// result &= executor.executor().awaitTermination(timeout, unit);
730+
// }
731+
// }
732+
// cachedTimeThread.join(unit.toMillis(timeout));
733+
// return result;
734+
// }
735+
694736
public ScheduledExecutorService scheduler() {
695737
return this.scheduler;
696738
}
@@ -948,6 +990,10 @@ public void writeTo(StreamOutput out) throws IOException {
948990
// Opensearch on older version doesn't know about "resizable" thread pool. Convert RESIZABLE to FIXED
949991
// to avoid serialization/de-serization issue between nodes with different OpenSearch version
950992
out.writeString(ThreadPoolType.FIXED.getType());
993+
} else if (type == ThreadPoolType.FORK_JOIN && out.getVersion().before(Version.V_3_2_0)) {
994+
// Opensearch on older version doesn't know about "fork_join" thread pool. Convert FORK_JOIN to FIXED (or SCALING, or
995+
// another safe fallback)
996+
out.writeString(ThreadPoolType.FIXED.getType());
951997
} else {
952998
out.writeString(type.getType());
953999
}
@@ -993,6 +1039,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
9931039
builder.field("core", min);
9941040
assert max != -1;
9951041
builder.field("max", max);
1042+
} else if (type == ThreadPoolType.FORK_JOIN) {
1043+
builder.field("size", -1);
1044+
builder.field("min", -1);
1045+
builder.field("max", -1);
1046+
builder.field("keep_alive", (Object) null);
1047+
builder.field("queue_size", -1);
9961048
} else {
9971049
assert max != -1;
9981050
builder.field("size", max);

0 commit comments

Comments
 (0)