Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
aa13d96
support local merged segment warmer
guojialiang92 May 12, 2025
1c1f802
update log
guojialiang92 May 12, 2025
58528e9
fix test
guojialiang92 May 12, 2025
7c8449e
add tests
guojialiang92 May 13, 2025
e327372
IndexShard support getActiveReplicaNodes
guojialiang92 May 13, 2025
290c5b9
add test
guojialiang92 May 13, 2025
f356225
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 3, 2025
ddb4258
add pre-verification.
guojialiang92 Jun 3, 2025
d63a676
extend ReplicationAction instead of ActionRequest
guojialiang92 Jun 3, 2025
c8625bb
reuse Store#getSegmentMetadataMap
guojialiang92 Jun 4, 2025
80e5372
refactor updateReplicationRateLimiter
guojialiang92 Jun 4, 2025
73d6616
extract an abstract base class (AbstractSegmentReplicationTarget) fro…
guojialiang92 Jun 4, 2025
b04b73b
reduce unnecessary exception judgment
guojialiang92 Jun 4, 2025
5d18ed1
fix UT
guojialiang92 Jun 4, 2025
779b532
add PublishMergedSegmentActionTests
guojialiang92 Jun 5, 2025
897d0bb
gradle spotlessApply
guojialiang92 Jun 5, 2025
f04152d
add test
guojialiang92 Jun 5, 2025
61ba203
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 25, 2025
1d83140
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 25, 2025
9f5f4c0
rename ReplicationSegmentCheckpoint to MergeSegmentCheckpoint
guojialiang92 Jun 25, 2025
01839ab
add some description
guojialiang92 Jun 25, 2025
d558c85
refactor code
guojialiang92 Jun 25, 2025
96fc56c
extract an abstract base class (AbstractPublishCheckpointAction) from…
guojialiang92 Jun 25, 2025
a0b4e3a
fix :server:japicmp
guojialiang92 Jun 25, 2025
cc5ea0b
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 26, 2025
dedad5e
update
guojialiang92 Jun 26, 2025
3224f35
update
guojialiang92 Jun 26, 2025
f5111ec
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MergedSegmentWarmerIT extends SegmentReplicationIT {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

public void testMergeSegmentWarmer() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
final String replicaNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

waitForSearchableDocs(30, primaryNode, replicaNode);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));

primaryTransportService.addRequestHandlingBehavior(
SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES,
(handler, request, channel, task) -> {
logger.info(
"replicationId {}, get segment files {}",
((GetSegmentFilesRequest) request).getReplicationId(),
((GetSegmentFilesRequest) request).getFilesToFetch().stream().map(StoreFileMetadata::name).collect(Collectors.toList())
);
// After the pre-copy merged segment is complete, the merged segment files is to reuse, so the files to fetch is empty.
assertEquals(0, ((GetSegmentFilesRequest) request).getFilesToFetch().size());
handler.messageReceived(request, channel, task);
}
);

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));

waitForSegmentCount(INDEX_NAME, 2, logger);
primaryTransportService.clearAllRules();
}

public void testConcurrentMergeSegmentWarmer() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// ensure pre-copy merge segment concurrent execution
AtomicInteger getMergeSegmentFilesActionCount = new AtomicInteger(0);
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));

CountDownLatch blockFileCopy = new CountDownLatch(1);
primaryTransportService.addRequestHandlingBehavior(
SegmentReplicationSourceService.Actions.GET_MERGED_SEGMENT_FILES,
(handler, request, channel, task) -> {
logger.info(
"replicationId {}, get merge segment files {}",
((GetSegmentFilesRequest) request).getReplicationId(),
((GetSegmentFilesRequest) request).getFilesToFetch().stream().map(StoreFileMetadata::name).collect(Collectors.toList())
);
getMergeSegmentFilesActionCount.incrementAndGet();
if (getMergeSegmentFilesActionCount.get() > 2) {
blockFileCopy.countDown();
}
handler.messageReceived(request, channel, task);
}
);

primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.MERGED_SEGMENT_FILE_CHUNK)) {
try {
blockFileCopy.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

connection.sendRequest(requestId, action, request, options);
}
);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));

waitForSegmentCount(INDEX_NAME, 2, logger);
primaryTransportService.clearAllRules();
}

public void testMergeSegmentWarmerWithInactiveReplica() throws Exception {
internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}

public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
assertBusy(() -> {
Set<String> primarySegments = Sets.newHashSet();
Set<String> replicaSegments = Sets.newHashSet();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
for (Segment segment : shardSegment.getSegments()) {
if (shardSegment.getShardRouting().primary()) {
primarySegments.add(segment.getName());
} else {
replicaSegments.add(segment.getName());
}
}
}
}
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
assertEquals(segmentCount, primarySegments.size());
assertEquals(segmentCount, replicaSegments.size());
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,108 @@

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.PublishMergedSegmentRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationSegmentCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Implementation of a {@link IndexWriter.IndexReaderWarmer} when local on-disk segment replication is enabled.
*
* @opensearch.internal
*/
public class LocalMergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
private static final Logger logger = LogManager.getLogger(LocalMergedSegmentWarmer.class);
private final TransportService transportService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final IndexShard indexShard;

public LocalMergedSegmentWarmer(TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) {
public LocalMergedSegmentWarmer(
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
IndexShard indexShard
) {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.indexShard = indexShard;
}

@Override
public void warm(LeafReader leafReader) throws IOException {
// TODO: node-node segment replication merged segment warmer
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
ReplicationSegmentCheckpoint mergedSegment = indexShard.computeReplicationSegmentCheckpoint(segmentCommitInfo);
PublishMergedSegmentRequest request = new PublishMergedSegmentRequest(mergedSegment);

DiscoveryNodes nodes = clusterService.state().nodes();
List<ShardRouting> replicaShards = indexShard.getReplicationGroup().getRoutingTable().replicaShards();
List<DiscoveryNode> activeReplicaNodes = replicaShards.stream()
.filter(ShardRouting::active)
.map(s -> nodes.get(s.currentNodeId()))
.toList();

if (activeReplicaNodes.isEmpty()) {
logger.trace("There are no active replicas, skip pre copy merged segment [{}]", segmentCommitInfo.info.name);
return;

Check warning on line 102 in server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java#L101-L102

Added lines #L101 - L102 were not covered by tests
}

CountDownLatch countDownLatch = new CountDownLatch(activeReplicaNodes.size());
AtomicInteger successfulCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);
for (DiscoveryNode replicaNode : activeReplicaNodes) {
ActionListener<TransportResponse> listener = ActionListener.wrap(r -> {
successfulCount.incrementAndGet();
countDownLatch.countDown();
}, e -> {
failureCount.incrementAndGet();
countDownLatch.countDown();
});

Check warning on line 115 in server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java#L113-L115

Added lines #L113 - L115 were not covered by tests
transportService.sendRequest(
replicaNode,
SegmentReplicationTargetService.Actions.PUBLISH_MERGED_SEGMENT,
request,
new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)

Check warning on line 120 in server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java#L120

Added line #L120 was not covered by tests
);
}
try {
countDownLatch.await(recoverySettings.getMergedSegmentReplicationTimeout().seconds(), TimeUnit.SECONDS);
logger.trace(
"pre copy merged segment [{}] to [{}] active replicas, [{}] successful, [{}] failed",
segmentCommitInfo.info.name,
activeReplicaNodes.size(),
successfulCount,
failureCount
);
} catch (InterruptedException e) {
logger.warn(
() -> new ParameterizedMessage("Interrupted while waiting for pre copy merged segment [{}]", segmentCommitInfo.info.name),

Check warning on line 134 in server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java#L132-L134

Added lines #L132 - L134 were not covered by tests
e
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
if (shard.indexSettings().isAssignedOnRemoteNode()) {
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService);
} else if (shard.indexSettings().isSegRepLocalEnabled()) {
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService);
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
} else if (shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
// IndexWriter.IndexReaderWarmer should be null.
Expand Down
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -198,6 +199,7 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.ReplicationSegmentCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -1845,6 +1847,26 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th
return checkpoint;
}

/**
* Compute {@link ReplicationSegmentCheckpoint} from a SegmentCommitInfo.
* This function fetches a metadata snapshot from the store that comes with an IO cost.
*
* @param segmentCommitInfo {@link SegmentCommitInfo} segmentCommitInfo to use to compute.
* @return {@link ReplicationSegmentCheckpoint} Checkpoint computed from the segmentCommitInfo.
* @throws IOException When there is an error computing segment metadata from the store.
*/
public ReplicationSegmentCheckpoint computeReplicationSegmentCheckpoint(SegmentCommitInfo segmentCommitInfo) throws IOException {
Map<String, StoreFileMetadata> segmentMetadataMap = store.getSegmentMetadataMap(segmentCommitInfo);
return new ReplicationSegmentCheckpoint(
shardId,
getOperationPrimaryTerm(),
segmentMetadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
segmentMetadataMap,
segmentCommitInfo.info.name
);
}

/**
* Checks if this target shard should start a round of segment replication.
* @return - True if the shard is able to perform segment replication.
Expand Down Expand Up @@ -1909,6 +1931,16 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
return true;
}

/**
* Checks if segment checkpoint should be processed
*
* @param requestCheckpoint received segment checkpoint that is checked for processing
* @return true if segment checkpoint should be processed
*/
public final boolean shouldProcessMergedSegmentCheckpoint(ReplicationCheckpoint requestCheckpoint) {
return isSegmentReplicationAllowed() && requestCheckpoint.getPrimaryTerm() >= getOperationPrimaryTerm();
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
Expand Down
Loading
Loading