Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for search replica to return segrep stats #16678

Merged
merged 17 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
Expand All @@ -27,7 +33,7 @@ public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
useRemoteStore = true;
vinaykpud marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -55,7 +61,7 @@ public void teardown() {
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
Expand All @@ -82,4 +88,47 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
createIndex(
INDEX_NAME,
Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();

// Verify the number of indices
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
// Verify total shards
assertEquals(2, segmentReplicationStatsResponse.getTotalShards());
// Verify the number of primary shards
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
// Verify the number of replica stats
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replicaStat : replicaStats) {
assertNotNull(replicaStat.getCurrentReplicationState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -38,7 +37,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Transport action for shard segment replication operation. This transport action does not actually
Expand Down Expand Up @@ -96,11 +97,11 @@ protected SegmentReplicationStatsResponse newResponse(
) {
String[] shards = request.shards();
final List<Integer> shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList());

// organize replica responses by allocationId.
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +110,7 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -126,15 +128,20 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the replica stats to the shard stat entry in each group.
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null));
}
}
}
return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = primaryStats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue()
.stream()
.map(groupStats -> updateGroupStats(groupStats, replicaStats))
.collect(Collectors.toList())
)
);

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, replicationStats, shardFailures);
}

@Override
Expand All @@ -144,9 +151,8 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws

@Override
protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());

if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
Expand All @@ -156,11 +162,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

@Override
Expand All @@ -181,4 +183,83 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationPerGroupStats updateGroupStats(
SegmentReplicationPerGroupStats groupStats,
Map<String, SegmentReplicationState> replicaStats
) {
// Update the SegmentReplicationState for each of the replicas
Set<SegmentReplicationShardStats> updatedReplicaStats = groupStats.getReplicaStats()
.stream()
.peek(replicaStat -> replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)))
.collect(Collectors.toSet());

// Compute search replica stats
Set<SegmentReplicationShardStats> searchReplicaStats = computeSearchReplicaStats(groupStats.getShardId(), replicaStats);

// Combine ReplicaStats and SearchReplicaStats
Set<SegmentReplicationShardStats> combinedStats = Stream.concat(updatedReplicaStats.stream(), searchReplicaStats.stream())
.collect(Collectors.toSet());

return new SegmentReplicationPerGroupStats(groupStats.getShardId(), combinedStats, groupStats.getRejectedRequestCount());
}

private Set<SegmentReplicationShardStats> computeSearchReplicaStats(
ShardId shardId,
Map<String, SegmentReplicationState> replicaStats
) {
return replicaStats.values()
.stream()
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().shardId().equals(shardId))
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().isSearchOnly())
.map(segmentReplicationState -> {
ShardRouting shardRouting = segmentReplicationState.getShardRouting();
SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats(shardRouting);
segmentReplicationStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationStats;
})
.collect(Collectors.toSet());
}

SegmentReplicationShardStats computeSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
vinaykpud marked this conversation as resolved.
Show resolved Hide resolved
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

return new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
0,
calculateBytesRemainingToReplicate(ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
);
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateBytesRemainingToReplicate(SegmentReplicationState ongoingSegmentReplicationState) {
if (ongoingSegmentReplicationState == null) {
return 0;
}
return ongoingSegmentReplicationState.getIndex()
.fileDetails()
.stream()
.mapToLong(index -> index.length() - index.recovered())
.sum();
}

private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(SegmentReplicationState completedSegmentReplicationState) {
return completedSegmentReplicationState != null ? completedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
this.currentReplicationState = in.readOptionalWriteable(SegmentReplicationState::new);

Check warning on line 67 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L67

Added line #L67 was not covered by tests
}

public String getAllocationId() {
Expand Down Expand Up @@ -134,6 +135,7 @@
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
out.writeOptionalWriteable(currentReplicationState);

Check warning on line 138 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L138

Added line #L138 was not covered by tests
vinaykpud marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Loading
Loading