Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change the default value of doc_values in WildcardFieldMapper to true. ([#19796](https://github.com/opensearch-project/OpenSearch/pull/19796))
- Make Engine#loadHistoryUUID() protected and Origin#isFromTranslog() public ([#19753](https://github.com/opensearch-project/OpenSearch/pull/19752))
- Bump opensearch-protobufs dependency to 0.23.0 and update transport-grpc module compatibility ([#19831](https://github.com/opensearch-project/OpenSearch/pull/19831))
- Refactor the RefreshStats class to use the Builder pattern instead of constructors ([#19835](https://github.com/opensearch-project/OpenSearch/pull/19835))

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down Expand Up @@ -74,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Deprecated
- Deprecated existing constructors in ThreadPoolStats.Stats in favor of the new Builder ([#19317](https://github.com/opensearch-project/OpenSearch/pull/19317))
- Deprecated existing constructors in IndexingStats.Stats in favor of the new Builder ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))
- Deprecated existing constructors in RefreshStats in favor of the new Builder ([#19835](https://github.com/opensearch-project/OpenSearch/pull/19835))

### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(listeners);
}

/**
* This constructor will be deprecated starting in version 3.4.0.
* Use {@link Builder} instead.
*/
@Deprecated
public RefreshStats(long total, long totalTimeInMillis, long externalTotal, long externalTotalTimeInMillis, int listeners) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
Expand Down Expand Up @@ -167,6 +172,66 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

/**
* Private constructor that takes a builder.
* This is the sole entry point for creating a new RefreshStats object.
* @param builder The builder instance containing all the values.
*/
private RefreshStats(Builder builder) {
this.total = builder.total;
this.totalTimeInMillis = builder.totalTimeInMillis;
this.externalTotal = builder.externalTotal;
this.externalTotalTimeInMillis = builder.externalTotalTimeInMillis;
this.listeners = builder.listeners;
}

/**
* Builder for the {@link RefreshStats} class.
* Provides a fluent API for constructing instances.
*/
public static class Builder {
private long total = 0;
private long totalTimeInMillis = 0;
private long externalTotal = 0;
private long externalTotalTimeInMillis = 0;
private int listeners = 0;

public Builder() {}

public Builder total(long total) {
this.total = total;
return this;
}

public Builder totalTimeInMillis(long time) {
this.totalTimeInMillis = time;
return this;
}

public Builder externalTotal(long total) {
this.externalTotal = total;
return this;
}

public Builder externalTotalTimeInMillis(long time) {
this.externalTotalTimeInMillis = time;
return this;
}

public Builder listeners(int listeners) {
this.listeners = listeners;
return this;
}

/**
* Creates a {@link RefreshStats} object from the builder's current state.
* @return A new RefreshStats instance.
*/
public RefreshStats build() {
return new RefreshStats(this);
}
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != RefreshStats.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,13 +1503,12 @@ public long getWritingBytes() {

public RefreshStats refreshStats() {
int listeners = refreshListeners.pendingCount();
return new RefreshStats(
refreshMetric.count(),
TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()),
externalRefreshMetric.count(),
TimeUnit.NANOSECONDS.toMillis(externalRefreshMetric.sum()),
listeners
);
return new RefreshStats.Builder().total(refreshMetric.count())
.totalTimeInMillis(TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()))
.externalTotal(externalRefreshMetric.count())
.externalTotalTimeInMillis(TimeUnit.NANOSECONDS.toMillis(externalRefreshMetric.sum()))
.listeners(listeners)
.build();
}

public FlushStats flushStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public Stats(StreamInput in) throws IOException {
}

/**
* This constructor will be deprecated starting in version 3.3.0.
* This constructor will be deprecated starting in version 3.4.0.
* Use {@link Builder} instead.
*/
@Deprecated
Expand Down Expand Up @@ -243,7 +243,7 @@ public Stats(
}

/**
* This constructor will be deprecated starting in version 3.3.0.
* This constructor will be deprecated starting in version 3.4.0.
* Use {@link Builder} instead.
*/
@Deprecated
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,18 @@ public ThreadPoolStats stats() {
continue;
}
if (holder.info.type == ThreadPoolType.FORK_JOIN) {
stats.add(new ThreadPoolStats.Stats(name, 0, 0, 0, 0, 0, 0, -1, holder.info.getMax()));
stats.add(
new ThreadPoolStats.Stats.Builder().name(name)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(holder.info.getMax())
.build()
);
continue;
}
int threads = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private Stats(Builder builder) {
}

/**
* This constructor will be deprecated starting in version 3.3.0.
* This constructor will be deprecated starting in version 3.4.0.
* Use {@link Builder} instead.
*/
@Deprecated
Expand All @@ -107,6 +107,11 @@ public Stats(String name, int threads, int queue, int active, long rejected, int
this.parallelism = -1;
}

/**
* This constructor will be deprecated starting in version 3.4.0.
* Use {@link Builder} instead.
*/
@Deprecated
public Stats(
String name,
int threads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,18 @@ private ExecutorService setupForceMergeThreadPool(int threadCount) {
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);

ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, threadCount, 0, 0, 0, threadCount, 0, -1, -1))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(threadCount)
.queue(0)
.active(0)
.rejected(0)
.largest(threadCount)
.completed(0)
.waitTimeNanos(-1)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
public class RefreshStatsTests extends OpenSearchTestCase {

public void testSerialize() throws IOException {
RefreshStats stats = new RefreshStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
between(0, Integer.MAX_VALUE)
);
RefreshStats stats = new RefreshStats.Builder().total(randomNonNegativeLong())
.totalTimeInMillis(randomNonNegativeLong())
.externalTotal(randomNonNegativeLong())
.externalTotalTimeInMillis(randomNonNegativeLong())
.listeners(between(0, Integer.MAX_VALUE))
.build();

BytesStreamOutput out = new BytesStreamOutput();
stats.writeTo(out);
StreamInput input = out.bytes().streamInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ public void testForkJoinRow() {
final int parallelism = 7;

ThreadPool.Info fjInfo = new ThreadPool.Info(poolName, ThreadPool.ThreadPoolType.FORK_JOIN, parallelism, parallelism, null, null);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, nodeName, nodeId, eid, pid, host, ip, port, poolName, fjInfo, dummyStats);
Expand Down Expand Up @@ -93,7 +102,16 @@ public void testNonForkJoinRowScaling() {
final String poolName = "generic";

ThreadPool.Info scalingInfo = new ThreadPool.Info(poolName, ThreadPool.ThreadPoolType.SCALING, 1, 4, null, null);
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats(poolName, 3, 2, 1, 5L, 3, 10L, 111L, -1);
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(3)
.queue(2)
.active(1)
.rejected(5L)
.largest(3)
.completed(10L)
.waitTimeNanos(111L)
.parallelism(-1)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, nodeName, nodeId, eid, pid, host, ip, port, poolName, scalingInfo, stats);
Expand All @@ -120,7 +138,17 @@ public void testForkJoinRowParallelismZero() {
final String poolName = "fj_zero";
final int parallelism = 0;
ThreadPool.Info fjInfo = new ThreadPool.Info(poolName, ThreadPool.ThreadPoolType.FORK_JOIN, parallelism, parallelism, null, null);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, "n", "id", "eid", 1L, "h", "ip", 9300, poolName, fjInfo, dummyStats);
assertEquals(parallelism, table.getRows().get(0).get(indexOf(table).get("parallelism")).value);
Expand All @@ -130,7 +158,16 @@ public void testForkJoinRowParallelismNegative() {
final String poolName = "fj_negative";
final int parallelism = -5;
ThreadPool.Info fjInfo = new ThreadPool.Info(poolName, ThreadPool.ThreadPoolType.FORK_JOIN, parallelism, parallelism, null, null);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, "n", "id", "eid", 1L, "h", "ip", 9300, poolName, fjInfo, dummyStats);
Expand All @@ -141,7 +178,16 @@ public void testForkJoinRowNullInfo() {
final String poolName = "fj_nullinfo";
final int parallelism = 3;
ThreadPool.Info fjInfo = null; // null info
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, "n", "id", "eid", 1L, "h", "ip", 9300, poolName, fjInfo, dummyStats);
Expand Down Expand Up @@ -177,7 +223,16 @@ public void testMultipleForkJoinRows() {
null,
null
);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolNames[i], 0, 0, 0, 0, 0, 0, -1, parallelisms[i]);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolNames[i])
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelisms[i])
.build();
action.writeRow(table, "n" + i, "id" + i, "eid" + i, 1L, "h" + i, "ip" + i, 9300 + i, poolNames[i], fjInfo, dummyStats);
}
assertEquals(2, table.getRows().size());
Expand All @@ -190,7 +245,16 @@ public void testForkJoinRowLargeParallelism() {
final String poolName = "fj_large";
final int parallelism = Integer.MAX_VALUE;
ThreadPool.Info fjInfo = new ThreadPool.Info(poolName, ThreadPool.ThreadPoolType.FORK_JOIN, parallelism, parallelism, null, null);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats dummyStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();

Table table = action.getTableWithHeader(new FakeRestRequest.Builder(xContentRegistry()).build());
action.writeRow(table, "n", "id", "eid", 1L, "h", "ip", 9300, poolName, fjInfo, dummyStats);
Expand Down Expand Up @@ -322,7 +386,16 @@ public void testBuildTableWithForkJoinPool() throws Exception {
when(nodesInfoResponse.getNodesMap()).thenReturn(nodeInfoMap);

// 5. ThreadPoolStats.Stats for ForkJoin
ThreadPoolStats.Stats fjStats = new ThreadPoolStats.Stats(poolName, 0, 0, 0, 0, 0, 0, -1, parallelism);
ThreadPoolStats.Stats fjStats = new ThreadPoolStats.Stats.Builder().name(poolName)
.threads(0)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0)
.waitTimeNanos(-1)
.parallelism(parallelism)
.build();
ThreadPoolStats threadPoolStats = new ThreadPoolStats(new ArrayList<>(List.of(fjStats)));
NodeStats nodeStats = mock(NodeStats.class);
when(nodeStats.getThreadPool()).thenReturn(threadPoolStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ public class RestThreadPoolActionTests extends OpenSearchTestCase {

public void testForkJoinPoolTypeStatsAreReported() {
// Setup for ForkJoinPool stats
ThreadPoolStats.Stats fjStats = new ThreadPoolStats.Stats(
"fork_join", // name
42, // active
84, // rejected
21, // largest
64, // completed
-1, // queue (should be -1 for FJ)
1, // threads
0, // taskTimeNanos (or whatever the last arg is)
8 // parallelism (for example: 8, or whatever is appropriate for your test)
);
ThreadPoolStats.Stats fjStats = new ThreadPoolStats.Stats.Builder().name("fork_join")
.threads(1)
.queue(-1) // should be -1 for FJ
.active(42)
.rejected(84)
.largest(21)
.completed(64)
.waitTimeNanos(0) // or whatever the last arg is
.parallelism(8) // for example: 8, or whatever is appropriate for your test
.build();

List<ThreadPoolStats.Stats> statsList = Collections.singletonList(fjStats);

Expand Down
Loading
Loading