Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add Segment Replication backpressure rejection stats to _nodes/stats #10656

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
Expand Down Expand Up @@ -129,6 +130,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchBackpressureStats searchBackpressureStats;

@Nullable
private SegmentReplicationRejectionStats segmentReplicationRejectionStats;

@Nullable
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;

Expand Down Expand Up @@ -211,6 +215,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
resourceUsageStats = null;
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
segmentReplicationRejectionStats = in.readOptionalWriteable(SegmentReplicationRejectionStats::new);
} else {
segmentReplicationRejectionStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new);
} else {
Expand Down Expand Up @@ -244,6 +254,7 @@ public NodeStats(
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats
) {
super(node);
Expand Down Expand Up @@ -271,6 +282,7 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
}

Expand Down Expand Up @@ -415,6 +427,10 @@ public SearchPipelineStats getSearchPipelineStats() {
}

@Nullable
public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() {
return segmentReplicationRejectionStats;
}

public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}
Expand Down Expand Up @@ -465,6 +481,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(resourceUsageStats);
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(segmentReplicationRejectionStats);
}
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
Expand Down Expand Up @@ -561,6 +581,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getResourceUsageStats() != null) {
getResourceUsageStats().toXContent(builder, params);
}
if (getSegmentReplicationRejectionStats() != null) {
getSegmentReplicationRejectionStats().toXContent(builder, params);
}

if (getRepositoriesStats() != null) {
getRepositoriesStats().toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public enum Metric {
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ public SegmentReplicationPressureService(
ClusterService clusterService,
IndicesService indicesService,
ShardStateAction shardStateAction,
SegmentReplicationStatsTracker tracker,
ThreadPool threadPool
) {
this.indicesService = indicesService;
this.tracker = new SegmentReplicationStatsTracker(this.indicesService);
this.tracker = tracker;
this.shardStateAction = shardStateAction;
this.threadPool = threadPool;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Segment replication rejection stats.
*
* @opensearch.internal
*/
public class SegmentReplicationRejectionStats implements Writeable, ToXContentFragment {
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Total rejections due to segment replication backpressure
*/
private long totalRejectionCount;

public SegmentReplicationRejectionStats(final long totalRejectionCount) {
this.totalRejectionCount = totalRejectionCount;
}

public SegmentReplicationRejectionStats(StreamInput in) throws IOException {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.totalRejectionCount = in.readVLong();
}
}

public long getTotalRejectionCount() {
return totalRejectionCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("segment_replication_backpressure");
builder.field("total_rejected_requests", totalRejectionCount);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(totalRejectionCount);
}
}

@Override
public String toString() {
return "SegmentReplicationRejectionStats{ totalRejectedRequestCount=" + totalRejectionCount + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public SegmentReplicationStatsTracker(IndicesService indicesService) {
rejectionCount = ConcurrentCollections.newConcurrentMap();
}

public SegmentReplicationRejectionStats getTotalRejectionStats() {
return new SegmentReplicationRejectionStats(this.rejectionCount.values().stream().mapToInt(AtomicInteger::get).sum());
}

protected Map<ShardId, AtomicInteger> getRejectionCount() {
return rejectionCount;
}

public SegmentReplicationStats getStats() {
Map<ShardId, SegmentReplicationPerGroupStats> stats = new HashMap<>();
for (IndexService indexService : indicesService) {
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
Expand Down Expand Up @@ -977,6 +978,7 @@ protected Node(
transportService.getTaskManager()
);

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
Expand Down Expand Up @@ -1116,6 +1118,7 @@ protected Node(
fileCache,
taskCancellationMonitoringService,
resourceUsageCollectorService,
segmentReplicationStatsTracker,
repositoryService
);

Expand Down Expand Up @@ -1246,6 +1249,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
});
injector = modules.createInjector();

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.discovery.Discovery;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class NodeService implements Closeable {
private final TaskCancellationMonitoringService taskCancellationMonitoringService;
private final RepositoriesService repositoriesService;

private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

NodeService(
Settings settings,
ThreadPool threadPool,
Expand All @@ -119,6 +122,7 @@ public class NodeService implements Closeable {
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService,
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService
) {
this.settings = settings;
Expand Down Expand Up @@ -146,6 +150,7 @@ public class NodeService implements Closeable {
this.repositoriesService = repositoriesService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
}

public NodeInfo info(
Expand Down Expand Up @@ -226,6 +231,7 @@ public NodeStats stats(
boolean taskCancellation,
boolean searchPipelineStats,
boolean resourceUsageStats,
boolean segmentReplicationTrackerStats,
boolean repositoriesStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
Expand Down Expand Up @@ -256,6 +262,7 @@ public NodeStats stats(
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.RemoteTranslogStats;
Expand Down Expand Up @@ -417,6 +418,17 @@ public void testSerialization() throws IOException {
assertEquals(aResourceUsageStats.getTimestamp(), bResourceUsageStats.getTimestamp());
});
}
SegmentReplicationRejectionStats segmentReplicationRejectionStats = nodeStats.getSegmentReplicationRejectionStats();
SegmentReplicationRejectionStats deserializedSegmentReplicationRejectionStats = deserializedNodeStats
.getSegmentReplicationRejectionStats();
if (segmentReplicationRejectionStats == null) {
assertNull(deserializedSegmentReplicationRejectionStats);
} else {
assertEquals(
segmentReplicationRejectionStats.getTotalRejectionCount(),
deserializedSegmentReplicationRejectionStats.getTotalRejectionCount()
);
}
ScriptCacheStats scriptCacheStats = nodeStats.getScriptCacheStats();
ScriptCacheStats deserializedScriptCacheStats = deserializedNodeStats.getScriptCacheStats();
if (scriptCacheStats == null) {
Expand Down Expand Up @@ -812,6 +824,11 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
}
nodesResourceUsageStats = new NodesResourceUsageStats(resourceUsageStatsMap);
}
SegmentReplicationRejectionStats segmentReplicationRejectionStats = null;
if (frequently()) {
segmentReplicationRejectionStats = new SegmentReplicationRejectionStats(randomNonNegativeLong());
}

ClusterManagerThrottlingStats clusterManagerThrottlingStats = null;
if (frequently()) {
clusterManagerThrottlingStats = new ClusterManagerThrottlingStats();
Expand Down Expand Up @@ -853,6 +870,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
null,
null,
null,
segmentReplicationRejectionStats,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -220,6 +221,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -248,6 +250,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -307,6 +310,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -335,6 +339,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -363,6 +368,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));

return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class));
return new SegmentReplicationPressureService(
settings,
clusterService,
indicesService,
shardStateAction,
new SegmentReplicationStatsTracker(indicesService),
mock(ThreadPool.class)
);
}
}
Loading