Skip to content

Commit f26c9f8

Browse files
committed
Stats API refactor; Cancellation of stream through StreamTransportResponse
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent 2233764 commit f26c9f8

File tree

15 files changed

+574
-303
lines changed

15 files changed

+574
-303
lines changed

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/stats/FlightStatsCollector.java

Lines changed: 53 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,14 @@ public class FlightStatsCollector extends AbstractLifecycleComponent {
4848
private final AtomicLong clientBatchTimeMax = new AtomicLong();
4949

5050
// Shared metrics
51-
private final AtomicLong bytesSentTotal = new AtomicLong();
52-
private final AtomicLong bytesReceivedTotal = new AtomicLong();
53-
private final AtomicLong streamErrorsTotal = new AtomicLong();
54-
private final AtomicLong connectionErrorsTotal = new AtomicLong();
55-
private final AtomicLong timeoutErrorsTotal = new AtomicLong();
56-
private final AtomicLong streamsCompletedSuccessfully = new AtomicLong();
57-
private final AtomicLong streamsFailedTotal = new AtomicLong();
51+
private final AtomicLong bytesSent = new AtomicLong();
52+
private final AtomicLong bytesReceived = new AtomicLong();
53+
private final AtomicLong clientApplicationErrors = new AtomicLong();
54+
private final AtomicLong clientTransportErrors = new AtomicLong();
55+
private final AtomicLong serverApplicationErrors = new AtomicLong();
56+
private final AtomicLong serverTransportErrors = new AtomicLong();
57+
private final AtomicLong clientStreamsCompleted = new AtomicLong();
58+
private final AtomicLong serverStreamsCompleted = new AtomicLong();
5859
private final long startTimeMillis = System.currentTimeMillis();
5960

6061
private final AtomicLong channelsActive = new AtomicLong();
@@ -107,18 +108,19 @@ public FlightTransportStats collectStats() {
107108
totalClientBatches,
108109
totalClientResponses,
109110
totalServerBatches,
110-
bytesSentTotal.get(),
111-
bytesReceivedTotal.get()
111+
bytesSent.get(),
112+
bytesReceived.get()
112113
);
113114

114115
ResourceUtilizationStats resourceUtilization = collectResourceStats();
115116

116117
ReliabilityStats reliability = new ReliabilityStats(
117-
streamErrorsTotal.get(),
118-
connectionErrorsTotal.get(),
119-
timeoutErrorsTotal.get(),
120-
streamsCompletedSuccessfully.get(),
121-
streamsFailedTotal.get(),
118+
clientApplicationErrors.get(),
119+
clientTransportErrors.get(),
120+
serverApplicationErrors.get(),
121+
serverTransportErrors.get(),
122+
clientStreamsCompleted.get(),
123+
serverStreamsCompleted.get(),
122124
System.currentTimeMillis() - startTimeMillis
123125
);
124126

@@ -146,37 +148,44 @@ private ResourceUtilizationStats collectResourceStats() {
146148
directMemoryUsed = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
147149
}
148150

149-
int flightThreadsActive = 0;
150-
int flightThreadsTotal = 0;
151+
int clientThreadsActive = 0;
152+
int clientThreadsTotal = 0;
153+
int serverThreadsActive = 0;
154+
int serverThreadsTotal = 0;
151155

152156
if (threadPool != null) {
153157
try {
154158
var allStats = threadPool.stats();
155159
for (var stat : allStats) {
156-
if (ServerConfig.FLIGHT_SERVER_THREAD_POOL_NAME.equals(stat.getName())
157-
|| ServerConfig.FLIGHT_CLIENT_THREAD_POOL_NAME.equals(stat.getName())) {
158-
flightThreadsActive += stat.getActive();
159-
flightThreadsTotal += stat.getThreads();
160+
if (ServerConfig.FLIGHT_CLIENT_THREAD_POOL_NAME.equals(stat.getName())) {
161+
clientThreadsActive += stat.getActive();
162+
clientThreadsTotal += stat.getThreads();
163+
} else if (ServerConfig.FLIGHT_SERVER_THREAD_POOL_NAME.equals(stat.getName())) {
164+
serverThreadsActive += stat.getActive();
165+
serverThreadsTotal += stat.getThreads();
160166
}
161167
}
162168
} catch (Exception e) {
163169
// Ignore thread pool stats errors
164170
}
165171
}
166172

173+
// Add Netty event loop threads to server total
167174
if (bossEventLoopGroup != null && !bossEventLoopGroup.isShutdown()) {
168-
flightThreadsTotal += 1;
175+
serverThreadsTotal += 1;
169176
}
170177
if (workerEventLoopGroup != null && !workerEventLoopGroup.isShutdown()) {
171-
flightThreadsTotal += Runtime.getRuntime().availableProcessors() * 2;
178+
serverThreadsTotal += Runtime.getRuntime().availableProcessors() * 2;
172179
}
173180

174181
return new ResourceUtilizationStats(
175182
arrowAllocatedBytes,
176183
arrowPeakBytes,
177184
directMemoryUsed,
178-
flightThreadsActive,
179-
flightThreadsTotal,
185+
clientThreadsActive,
186+
clientThreadsTotal,
187+
serverThreadsActive,
188+
serverThreadsTotal,
180189
(int) channelsActive.get(),
181190
(int) channelsActive.get()
182191
);
@@ -257,58 +266,43 @@ public void addClientBatchTime(long timeMillis) {
257266
/** Adds bytes sent
258267
* @param bytes number of bytes */
259268
public void addBytesSent(long bytes) {
260-
bytesSentTotal.addAndGet(bytes);
269+
bytesSent.addAndGet(bytes);
261270
}
262271

263272
/** Adds bytes received
264273
* @param bytes number of bytes */
265274
public void addBytesReceived(long bytes) {
266-
bytesReceivedTotal.addAndGet(bytes);
275+
bytesReceived.addAndGet(bytes);
267276
}
268277

269-
/** Increments stream errors counter */
270-
public void incrementStreamErrors() {
271-
streamErrorsTotal.incrementAndGet();
278+
/** Increments client application errors counter */
279+
public void incrementClientApplicationErrors() {
280+
clientApplicationErrors.incrementAndGet();
272281
}
273282

274-
/** Increments connection errors counter */
275-
public void incrementConnectionErrors() {
276-
connectionErrorsTotal.incrementAndGet();
283+
/** Increments client transport errors counter */
284+
public void incrementClientTransportErrors() {
285+
clientTransportErrors.incrementAndGet();
277286
}
278287

279-
/** Increments timeout errors counter */
280-
public void incrementTimeoutErrors() {
281-
timeoutErrorsTotal.incrementAndGet();
288+
/** Increments server application errors counter */
289+
public void incrementServerApplicationErrors() {
290+
serverApplicationErrors.incrementAndGet();
282291
}
283292

284-
/** Increments serialization errors counter */
285-
public void incrementSerializationErrors() {
286-
streamErrorsTotal.incrementAndGet();
293+
/** Increments server transport errors counter */
294+
public void incrementServerTransportErrors() {
295+
serverTransportErrors.incrementAndGet();
287296
}
288297

289-
/** Increments transport errors counter */
290-
public void incrementTransportErrors() {
291-
streamErrorsTotal.incrementAndGet();
298+
/** Increments client streams completed counter */
299+
public void incrementClientStreamsCompleted() {
300+
clientStreamsCompleted.incrementAndGet();
292301
}
293302

294-
/** Increments channel errors counter */
295-
public void incrementChannelErrors() {
296-
streamErrorsTotal.incrementAndGet();
297-
}
298-
299-
/** Increments Flight server errors counter */
300-
public void incrementFlightServerErrors() {
301-
streamErrorsTotal.incrementAndGet();
302-
}
303-
304-
/** Increments completed streams counter */
305-
public void incrementStreamsCompleted() {
306-
streamsCompletedSuccessfully.incrementAndGet();
307-
}
308-
309-
/** Increments failed streams counter */
310-
public void incrementStreamsFailed() {
311-
streamsFailedTotal.incrementAndGet();
303+
/** Increments server streams completed counter */
304+
public void incrementServerStreamsCompleted() {
305+
serverStreamsCompleted.incrementAndGet();
312306
}
313307

314308
/** Increments active channels counter */

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/stats/FlightStatsResponse.java

Lines changed: 138 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import org.opensearch.action.FailedNodeException;
1212
import org.opensearch.action.support.nodes.BaseNodesResponse;
1313
import org.opensearch.cluster.ClusterName;
14+
import org.opensearch.common.unit.TimeValue;
1415
import org.opensearch.core.common.io.stream.StreamInput;
1516
import org.opensearch.core.common.io.stream.StreamOutput;
17+
import org.opensearch.core.common.unit.ByteSizeValue;
1618
import org.opensearch.core.xcontent.ToXContentObject;
1719
import org.opensearch.core.xcontent.XContentBuilder;
1820

@@ -72,41 +74,163 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7274
}
7375

7476
private void aggregateClusterStats(XContentBuilder builder, Params params) throws IOException {
77+
// Performance aggregates
7578
long totalServerRequests = 0;
7679
long totalServerRequestsCurrent = 0;
80+
long totalServerBatches = 0;
7781
long totalClientBatches = 0;
7882
long totalClientResponses = 0;
7983
long totalBytesSent = 0;
8084
long totalBytesReceived = 0;
81-
long totalStreamErrors = 0;
85+
long totalServerRequestTime = 0;
86+
long totalServerBatchTime = 0;
87+
long totalClientBatchTime = 0;
88+
89+
// Reliability aggregates
90+
long totalClientApplicationErrors = 0;
91+
long totalClientTransportErrors = 0;
92+
long totalServerApplicationErrors = 0;
93+
long totalServerTransportErrors = 0;
94+
long totalClientStreamsCompleted = 0;
95+
long totalServerStreamsCompleted = 0;
96+
long totalUptime = 0;
97+
98+
// Resource aggregates
99+
long totalArrowAllocated = 0;
100+
long totalArrowPeak = 0;
101+
long totalDirectMemory = 0;
102+
int totalClientThreadsActive = 0;
103+
int totalClientThreadsTotal = 0;
104+
int totalServerThreadsActive = 0;
105+
int totalServerThreadsTotal = 0;
106+
int totalConnections = 0;
107+
int totalChannels = 0;
82108

83109
for (FlightNodeStats nodeStats : getNodes()) {
84110
FlightTransportStats stats = nodeStats.getFlightStats();
111+
112+
// Performance
85113
totalServerRequests += stats.performance.serverRequestsReceived;
86114
totalServerRequestsCurrent += stats.performance.serverRequestsCurrent;
115+
totalServerBatches += stats.performance.serverBatchesSent;
87116
totalClientBatches += stats.performance.clientBatchesReceived;
88117
totalClientResponses += stats.performance.clientResponsesReceived;
89-
totalBytesSent += stats.performance.bytesSentTotal;
90-
totalBytesReceived += stats.performance.bytesReceivedTotal;
91-
totalStreamErrors += stats.reliability.streamErrorsTotal;
118+
totalBytesSent += stats.performance.bytesSent;
119+
totalBytesReceived += stats.performance.bytesReceived;
120+
totalServerRequestTime += stats.performance.serverRequestTotalMillis;
121+
totalServerBatchTime += stats.performance.serverBatchTotalMillis;
122+
totalClientBatchTime += stats.performance.clientBatchTotalMillis;
123+
124+
// Reliability
125+
totalClientApplicationErrors += stats.reliability.clientApplicationErrors;
126+
totalClientTransportErrors += stats.reliability.clientTransportErrors;
127+
totalServerApplicationErrors += stats.reliability.serverApplicationErrors;
128+
totalServerTransportErrors += stats.reliability.serverTransportErrors;
129+
totalClientStreamsCompleted += stats.reliability.clientStreamsCompleted;
130+
totalServerStreamsCompleted += stats.reliability.serverStreamsCompleted;
131+
totalUptime = Math.max(totalUptime, stats.reliability.uptimeMillis);
132+
133+
// Resources
134+
totalArrowAllocated += stats.resourceUtilization.arrowAllocatedBytes;
135+
totalArrowPeak = Math.max(totalArrowPeak, stats.resourceUtilization.arrowPeakBytes);
136+
totalDirectMemory += stats.resourceUtilization.directMemoryBytes;
137+
totalClientThreadsActive += stats.resourceUtilization.clientThreadsActive;
138+
totalClientThreadsTotal += stats.resourceUtilization.clientThreadsTotal;
139+
totalServerThreadsActive += stats.resourceUtilization.serverThreadsActive;
140+
totalServerThreadsTotal += stats.resourceUtilization.serverThreadsTotal;
141+
totalConnections += stats.resourceUtilization.connectionsActive;
142+
totalChannels += stats.resourceUtilization.channelsActive;
92143
}
93144

145+
// Performance stats
94146
builder.startObject("performance");
95-
builder.field("total_server_requests", totalServerRequests);
96-
builder.field("total_server_requests_current", totalServerRequestsCurrent);
97-
builder.field("total_client_batches", totalClientBatches);
98-
builder.field("total_client_responses", totalClientResponses);
99-
builder.field("total_bytes_sent", totalBytesSent);
100-
builder.field("total_bytes_received", totalBytesReceived);
147+
builder.field("server_requests_total", totalServerRequests);
148+
builder.field("server_requests_current", totalServerRequestsCurrent);
149+
builder.field("server_batches_sent", totalServerBatches);
150+
builder.field("client_batches_received", totalClientBatches);
151+
builder.field("client_responses_received", totalClientResponses);
152+
builder.field("bytes_sent", totalBytesSent);
153+
if (params.paramAsBoolean("human", false)) {
154+
builder.field("bytes_sent_human", new ByteSizeValue(totalBytesSent).toString());
155+
}
156+
builder.field("bytes_received", totalBytesReceived);
157+
if (params.paramAsBoolean("human", false)) {
158+
builder.field("bytes_received_human", new ByteSizeValue(totalBytesReceived).toString());
159+
}
160+
if (totalServerRequests > 0) {
161+
long avgRequestTime = totalServerRequestTime / totalServerRequests;
162+
builder.field("server_request_avg_millis", avgRequestTime);
163+
if (params.paramAsBoolean("human", false)) {
164+
builder.field("server_request_avg_time", TimeValue.timeValueMillis(avgRequestTime).toString());
165+
}
166+
}
167+
if (totalServerBatches > 0) {
168+
long avgBatchTime = totalServerBatchTime / totalServerBatches;
169+
builder.field("server_batch_avg_millis", avgBatchTime);
170+
if (params.paramAsBoolean("human", false)) {
171+
builder.field("server_batch_avg_time", TimeValue.timeValueMillis(avgBatchTime).toString());
172+
}
173+
}
174+
if (totalClientBatches > 0) {
175+
long avgClientBatchTime = totalClientBatchTime / totalClientBatches;
176+
builder.field("client_batch_avg_millis", avgClientBatchTime);
177+
if (params.paramAsBoolean("human", false)) {
178+
builder.field("client_batch_avg_time", TimeValue.timeValueMillis(avgClientBatchTime).toString());
179+
}
180+
}
101181
builder.endObject();
102182

183+
// Reliability stats
103184
builder.startObject("reliability");
104-
builder.field("total_stream_errors", totalStreamErrors);
105-
if (totalServerRequests > 0) {
106-
builder.field("cluster_error_rate_percent", (totalStreamErrors * 100.0) / totalServerRequests);
185+
builder.field("client_application_errors", totalClientApplicationErrors);
186+
builder.field("client_transport_errors", totalClientTransportErrors);
187+
builder.field("server_application_errors", totalServerApplicationErrors);
188+
builder.field("server_transport_errors", totalServerTransportErrors);
189+
builder.field("client_streams_completed", totalClientStreamsCompleted);
190+
builder.field("server_streams_completed", totalServerStreamsCompleted);
191+
builder.field("cluster_uptime_millis", totalUptime);
192+
if (params.paramAsBoolean("human", false)) {
193+
builder.field("cluster_uptime", TimeValue.timeValueMillis(totalUptime).toString());
194+
}
195+
196+
long totalErrors = totalClientApplicationErrors + totalClientTransportErrors + totalServerApplicationErrors
197+
+ totalServerTransportErrors;
198+
long totalStreams = totalClientStreamsCompleted + totalServerStreamsCompleted + totalErrors;
199+
if (totalStreams > 0) {
200+
builder.field("cluster_error_rate_percent", (totalErrors * 100.0) / totalStreams);
201+
builder.field(
202+
"cluster_success_rate_percent",
203+
((totalClientStreamsCompleted + totalServerStreamsCompleted) * 100.0) / totalStreams
204+
);
107205
}
108206
builder.endObject();
109207

110-
// Resource utilization stats are per-node only
208+
// Resource utilization stats
209+
builder.startObject("resource_utilization");
210+
builder.field("arrow_allocated_bytes_total", totalArrowAllocated);
211+
if (params.paramAsBoolean("human", false)) {
212+
builder.field("arrow_allocated_total", new ByteSizeValue(totalArrowAllocated).toString());
213+
}
214+
builder.field("arrow_peak_bytes_max", totalArrowPeak);
215+
if (params.paramAsBoolean("human", false)) {
216+
builder.field("arrow_peak_max", new ByteSizeValue(totalArrowPeak).toString());
217+
}
218+
builder.field("direct_memory_bytes_total", totalDirectMemory);
219+
if (params.paramAsBoolean("human", false)) {
220+
builder.field("direct_memory_total", new ByteSizeValue(totalDirectMemory).toString());
221+
}
222+
builder.field("client_threads_active", totalClientThreadsActive);
223+
builder.field("client_threads_total", totalClientThreadsTotal);
224+
builder.field("server_threads_active", totalServerThreadsActive);
225+
builder.field("server_threads_total", totalServerThreadsTotal);
226+
builder.field("connections_active", totalConnections);
227+
builder.field("channels_active", totalChannels);
228+
if (totalClientThreadsTotal > 0) {
229+
builder.field("client_thread_utilization_percent", (totalClientThreadsActive * 100.0) / totalClientThreadsTotal);
230+
}
231+
if (totalServerThreadsTotal > 0) {
232+
builder.field("server_thread_utilization_percent", (totalServerThreadsActive * 100.0) / totalServerThreadsTotal);
233+
}
234+
builder.endObject();
111235
}
112236
}

0 commit comments

Comments
 (0)