Skip to content

Commit

Permalink
Added support for search replica to return segrep stats (#16678) (#16842
Browse files Browse the repository at this point in the history
)

* Added implementation for the stats calculation for search and regular replica in shards



* Updated changelog



* Added unit tests for TransportSegmentReplicationStatsAction



* fixed java style after running precommit locally



* refined the test cases



* fixed style issues



* Made changes in the bytes to download calculation based on comments



* added addReplicaStats method to SegmentReplicationPerGroupStats



* fixed style issues



* Fixed issue with immutable set



* Fixed PR comments and moved the integration tests to separate module



* Fixed failing integ tests



* Fixed failing integ test



* fixed some comments for PR



* fixed failing tests



---------


(cherry picked from commit b67cdf4)

Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent d8f7e48 commit dfebb8f
Show file tree
Hide file tree
Showing 4 changed files with 744 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
Expand Down Expand Up @@ -82,4 +88,47 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
createIndex(
INDEX_NAME,
Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();

// Verify the number of indices
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
// Verify total shards
assertEquals(2, segmentReplicationStatsResponse.getTotalShards());
// Verify the number of primary shards
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
// Verify the number of replica stats
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replicaStat : replicaStats) {
assertNotNull(replicaStat.getCurrentReplicationState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -38,7 +37,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Transport action for shard segment replication operation. This transport action does not actually
Expand Down Expand Up @@ -96,11 +97,11 @@ protected SegmentReplicationStatsResponse newResponse(
) {
String[] shards = request.shards();
final List<Integer> shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList());

// organize replica responses by allocationId.
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +110,7 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -126,15 +128,20 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the replica stats to the shard stat entry in each group.
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null));
}
}
}
return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = primaryStats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue()
.stream()
.map(groupStats -> updateGroupStats(groupStats, replicaStats))
.collect(Collectors.toList())
)
);

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, replicationStats, shardFailures);
}

@Override
Expand All @@ -144,9 +151,8 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws

@Override
protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());

if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
Expand All @@ -156,11 +162,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

@Override
Expand All @@ -181,4 +183,83 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationPerGroupStats updateGroupStats(
SegmentReplicationPerGroupStats groupStats,
Map<String, SegmentReplicationState> replicaStats
) {
// Update the SegmentReplicationState for each of the replicas
Set<SegmentReplicationShardStats> updatedReplicaStats = groupStats.getReplicaStats()
.stream()
.peek(replicaStat -> replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)))
.collect(Collectors.toSet());

// Compute search replica stats
Set<SegmentReplicationShardStats> searchReplicaStats = computeSearchReplicaStats(groupStats.getShardId(), replicaStats);

// Combine ReplicaStats and SearchReplicaStats
Set<SegmentReplicationShardStats> combinedStats = Stream.concat(updatedReplicaStats.stream(), searchReplicaStats.stream())
.collect(Collectors.toSet());

return new SegmentReplicationPerGroupStats(groupStats.getShardId(), combinedStats, groupStats.getRejectedRequestCount());
}

private Set<SegmentReplicationShardStats> computeSearchReplicaStats(
ShardId shardId,
Map<String, SegmentReplicationState> replicaStats
) {
return replicaStats.values()
.stream()
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().shardId().equals(shardId))
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().isSearchOnly())
.map(segmentReplicationState -> {
ShardRouting shardRouting = segmentReplicationState.getShardRouting();
SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats(shardRouting);
segmentReplicationStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationStats;
})
.collect(Collectors.toSet());
}

SegmentReplicationShardStats computeSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

return new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
0,
calculateBytesRemainingToReplicate(ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
);
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateBytesRemainingToReplicate(SegmentReplicationState ongoingSegmentReplicationState) {
if (ongoingSegmentReplicationState == null) {
return 0;
}
return ongoingSegmentReplicationState.getIndex()
.fileDetails()
.stream()
.mapToLong(index -> index.length() - index.recovered())
.sum();
}

private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(SegmentReplicationState completedSegmentReplicationState) {
return completedSegmentReplicationState != null ? completedSegmentReplicationState.getTimer().time() : 0;
}
}
Loading

0 comments on commit dfebb8f

Please sign in to comment.