Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
aa13d96
support local merged segment warmer
guojialiang92 May 12, 2025
1c1f802
update log
guojialiang92 May 12, 2025
58528e9
fix test
guojialiang92 May 12, 2025
7c8449e
add tests
guojialiang92 May 13, 2025
e327372
IndexShard support getActiveReplicaNodes
guojialiang92 May 13, 2025
290c5b9
add test
guojialiang92 May 13, 2025
f356225
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 3, 2025
ddb4258
add pre-verification.
guojialiang92 Jun 3, 2025
d63a676
extend ReplicationAction instead of ActionRequest
guojialiang92 Jun 3, 2025
c8625bb
reuse Store#getSegmentMetadataMap
guojialiang92 Jun 4, 2025
80e5372
refactor updateReplicationRateLimiter
guojialiang92 Jun 4, 2025
73d6616
extract an abstract base class (AbstractSegmentReplicationTarget) fro…
guojialiang92 Jun 4, 2025
b04b73b
reduce unnecessary exception judgment
guojialiang92 Jun 4, 2025
5d18ed1
fix UT
guojialiang92 Jun 4, 2025
779b532
add PublishMergedSegmentActionTests
guojialiang92 Jun 5, 2025
897d0bb
gradle spotlessApply
guojialiang92 Jun 5, 2025
f04152d
add test
guojialiang92 Jun 5, 2025
61ba203
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 25, 2025
1d83140
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 25, 2025
9f5f4c0
rename ReplicationSegmentCheckpoint to MergeSegmentCheckpoint
guojialiang92 Jun 25, 2025
01839ab
add some description
guojialiang92 Jun 25, 2025
d558c85
refactor code
guojialiang92 Jun 25, 2025
96fc56c
extract an abstract base class (AbstractPublishCheckpointAction) from…
guojialiang92 Jun 25, 2025
a0b4e3a
fix :server:japicmp
guojialiang92 Jun 25, 2025
cc5ea0b
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 26, 2025
dedad5e
update
guojialiang92 Jun 26, 2025
3224f35
update
guojialiang92 Jun 26, 2025
f5111ec
Merge remote-tracking branch 'origin/main' into dev/support-local-mer…
guojialiang92 Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -729,7 +730,8 @@ public static final IndexShard newIndexShard(
OpenSearchTestCase::randomBoolean,
() -> indexService.getIndexSettings().getRefreshInterval(),
indexService.getRefreshMutex(),
clusterService.getClusterApplierService()
clusterService.getClusterApplierService(),
MergedSegmentPublisher.EMPTY
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MergedSegmentWarmerIT extends SegmentReplicationIT {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

public void testMergeSegmentWarmer() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
final String replicaNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

waitForSearchableDocs(30, primaryNode, replicaNode);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));

primaryTransportService.addRequestHandlingBehavior(
SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES,
(handler, request, channel, task) -> {
logger.info(
"replicationId {}, get segment files {}",
((GetSegmentFilesRequest) request).getReplicationId(),
((GetSegmentFilesRequest) request).getFilesToFetch().stream().map(StoreFileMetadata::name).collect(Collectors.toList())
);
// After the pre-copy merged segment is complete, the merged segment files is to reuse, so the files to fetch is empty.
assertEquals(0, ((GetSegmentFilesRequest) request).getFilesToFetch().size());
handler.messageReceived(request, channel, task);
}
);

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));

waitForSegmentCount(INDEX_NAME, 2, logger);
primaryTransportService.clearAllRules();
}

public void testConcurrentMergeSegmentWarmer() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// ensure pre-copy merge segment concurrent execution
AtomicInteger getMergeSegmentFilesActionCount = new AtomicInteger(0);
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));

CountDownLatch blockFileCopy = new CountDownLatch(1);
primaryTransportService.addRequestHandlingBehavior(
SegmentReplicationSourceService.Actions.GET_MERGED_SEGMENT_FILES,
(handler, request, channel, task) -> {
logger.info(
"replicationId {}, get merge segment files {}",
((GetSegmentFilesRequest) request).getReplicationId(),
((GetSegmentFilesRequest) request).getFilesToFetch().stream().map(StoreFileMetadata::name).collect(Collectors.toList())
);
getMergeSegmentFilesActionCount.incrementAndGet();
if (getMergeSegmentFilesActionCount.get() > 2) {
blockFileCopy.countDown();
}
handler.messageReceived(request, channel, task);
}
);

primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.MERGED_SEGMENT_FILE_CHUNK)) {
try {
blockFileCopy.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

connection.sendRequest(requestId, action, request, options);
}
);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));

waitForSegmentCount(INDEX_NAME, 2, logger);
primaryTransportService.clearAllRules();
}

public void testMergeSegmentWarmerWithInactiveReplica() throws Exception {
internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}

public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
assertBusy(() -> {
Set<String> primarySegments = Sets.newHashSet();
Set<String> replicaSegments = Sets.newHashSet();
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
for (Segment segment : shardSegment.getSegments()) {
if (shardSegment.getShardRouting().primary()) {
primarySegments.add(segment.getName());
} else {
replicaSegments.add(segment.getName());
}
}
}
}
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
assertEquals(segmentCount, primarySegments.size());
assertEquals(segmentCount, replicaSegments.size());
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -599,7 +600,8 @@ public synchronized IndexShard createShard(
final DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode,
DiscoveryNodes discoveryNodes,
MergedSegmentWarmerFactory mergedSegmentWarmerFactory
MergedSegmentWarmerFactory mergedSegmentWarmerFactory,
MergedSegmentPublisher mergedSegmentPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -731,7 +733,8 @@ protected void closeInternal() {
fixedRefreshIntervalSchedulingEnabled,
this::getRefreshInterval,
refreshMutex,
clusterService.getClusterApplierService()
clusterService.getClusterApplierService(),
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;

Expand All @@ -49,15 +52,26 @@ public class LocalMergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
private final TransportService transportService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final IndexShard indexShard;

public LocalMergedSegmentWarmer(TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) {
public LocalMergedSegmentWarmer(
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
IndexShard indexShard
) {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.indexShard = indexShard;
}

@Override
public void warm(LeafReader leafReader) throws IOException {
// TODO: node-node segment replication merged segment warmer
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;

SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
indexShard.publishMergedSegment(segmentCommitInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
if (shard.indexSettings().isAssignedOnRemoteNode()) {
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService);
} else if (shard.indexSettings().isSegRepLocalEnabled()) {
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService);
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
} else if (shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
// IndexWriter.IndexReaderWarmer should be null.
Expand Down
Loading
Loading