Skip to content

Commit 65941c6

Browse files
Rishikesh1159github-actions[bot]
authored andcommitted
Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes (#3108)
* Intial PR adding classes and tests related to checkpoint publishing Signed-off-by: Rishikesh1159 <[email protected]> * Putting a Draft PR with all changes in classes. Testing is still not included in this commit. Signed-off-by: Rishikesh1159 <[email protected]> * Wiring up index shard to new engine, spotless apply and removing unnecessary tests and logs Signed-off-by: Rishikesh1159 <[email protected]> * Adding Unit test for checkpointRefreshListener Signed-off-by: Rishikesh1159 <[email protected]> * Applying spotless check Signed-off-by: Rishikesh1159 <[email protected]> * Fixing import statements * Signed-off-by: Rishikesh1159 <[email protected]> * removing unused constructor in index shard Signed-off-by: Rishikesh1159 <[email protected]> * Addressing comments from last commit Signed-off-by: Rishikesh1159 <[email protected]> * Adding package-info.java files for two new packages Signed-off-by: Rishikesh1159 <[email protected]> * Adding test for null checkpoint publisher and addreesing PR comments Signed-off-by: Rishikesh1159 <[email protected]> * Add docs for indexshardtests and remove shard.refresh Signed-off-by: Rishikesh1159 <[email protected]> (cherry picked from commit fd5a38d)
1 parent 053cf55 commit 65941c6

File tree

19 files changed

+814
-10
lines changed

19 files changed

+814
-10
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.opensearch.indices.IndicesService;
8585
import org.opensearch.indices.breaker.CircuitBreakerService;
8686
import org.opensearch.indices.recovery.RecoveryState;
87+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
8788
import org.opensearch.plugins.Plugin;
8889
import org.opensearch.search.builder.SearchSourceBuilder;
8990
import org.opensearch.test.DummyShardLock;
@@ -673,7 +674,8 @@ public static final IndexShard newIndexShard(
673674
Arrays.asList(listeners),
674675
() -> {},
675676
RetentionLeaseSyncer.EMPTY,
676-
cbs
677+
cbs,
678+
SegmentReplicationCheckpointPublisher.EMPTY
677679
);
678680
}
679681

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
9595
import org.opensearch.indices.mapper.MapperRegistry;
9696
import org.opensearch.indices.recovery.RecoveryState;
97+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
9798
import org.opensearch.plugins.IndexStorePlugin;
9899
import org.opensearch.script.ScriptService;
99100
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
@@ -428,7 +429,8 @@ private long getAvgShardSizeInBytes() throws IOException {
428429
public synchronized IndexShard createShard(
429430
final ShardRouting routing,
430431
final Consumer<ShardId> globalCheckpointSyncer,
431-
final RetentionLeaseSyncer retentionLeaseSyncer
432+
final RetentionLeaseSyncer retentionLeaseSyncer,
433+
final SegmentReplicationCheckpointPublisher checkpointPublisher
432434
) throws IOException {
433435
Objects.requireNonNull(retentionLeaseSyncer);
434436
/*
@@ -530,7 +532,8 @@ public synchronized IndexShard createShard(
530532
indexingOperationListeners,
531533
() -> globalCheckpointSyncer.accept(shardId),
532534
retentionLeaseSyncer,
533-
circuitBreakerService
535+
circuitBreakerService,
536+
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
534537
);
535538
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
536539
eventListener.afterIndexShardCreated(indexShard);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.search.ReferenceManager;
14+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
20+
* This class is only used with Segment Replication enabled.
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {
25+
26+
protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);
27+
28+
private final IndexShard shard;
29+
private final SegmentReplicationCheckpointPublisher publisher;
30+
31+
public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
32+
this.shard = shard;
33+
this.publisher = publisher;
34+
}
35+
36+
@Override
37+
public void beforeRefresh() throws IOException {
38+
// Do nothing
39+
}
40+
41+
@Override
42+
public void afterRefresh(boolean didRefresh) throws IOException {
43+
if (didRefresh) {
44+
publisher.publish(shard);
45+
}
46+
}
47+
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@
160160
import org.opensearch.indices.recovery.RecoveryListener;
161161
import org.opensearch.indices.recovery.RecoveryState;
162162
import org.opensearch.indices.recovery.RecoveryTarget;
163+
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
164+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
165+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
163166
import org.opensearch.repositories.RepositoriesService;
164167
import org.opensearch.repositories.Repository;
165168
import org.opensearch.rest.RestStatus;
@@ -299,6 +302,7 @@ Runnable getGlobalCheckpointSyncer() {
299302
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
300303
private final RefreshPendingLocationListener refreshPendingLocationListener;
301304
private volatile boolean useRetentionLeasesInPeerRecovery;
305+
private final ReferenceManager.RefreshListener checkpointRefreshListener;
302306

303307
public IndexShard(
304308
final ShardRouting shardRouting,
@@ -320,7 +324,8 @@ public IndexShard(
320324
final List<IndexingOperationListener> listeners,
321325
final Runnable globalCheckpointSyncer,
322326
final RetentionLeaseSyncer retentionLeaseSyncer,
323-
final CircuitBreakerService circuitBreakerService
327+
final CircuitBreakerService circuitBreakerService,
328+
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
324329
) throws IOException {
325330
super(shardRouting.shardId(), indexSettings);
326331
assert shardRouting.initializing();
@@ -403,6 +408,11 @@ public boolean shouldCache(Query query) {
403408
persistMetadata(path, indexSettings, shardRouting, null, logger);
404409
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
405410
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
411+
if (checkpointPublisher != null) {
412+
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
413+
} else {
414+
this.checkpointRefreshListener = null;
415+
}
406416
}
407417

408418
public ThreadPool getThreadPool() {
@@ -1363,6 +1373,21 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
13631373
}
13641374
}
13651375

1376+
/**
1377+
* Returns the lastest Replication Checkpoint that shard received
1378+
*/
1379+
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
1380+
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
1381+
}
1382+
1383+
/**
1384+
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
1385+
*/
1386+
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
1387+
assert shardRouting.primary() == false;
1388+
// TODO
1389+
}
1390+
13661391
/**
13671392
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
13681393
* without having to worry about the current state of the engine and concurrent flushes.
@@ -3118,6 +3143,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
31183143
}
31193144
};
31203145

3146+
final List<ReferenceManager.RefreshListener> internalRefreshListener;
3147+
if (this.checkpointRefreshListener != null) {
3148+
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
3149+
} else {
3150+
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
3151+
}
3152+
31213153
return this.engineConfigFactory.newEngineConfig(
31223154
shardId,
31233155
threadPool,
@@ -3134,7 +3166,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
31343166
translogConfig,
31353167
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
31363168
Arrays.asList(refreshListeners, refreshPendingLocationListener),
3137-
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
3169+
internalRefreshListener,
31383170
indexSort,
31393171
circuitBreakerService,
31403172
globalCheckpointSupplier,

server/src/main/java/org/opensearch/indices/IndicesModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.common.inject.AbstractModule;
4242
import org.opensearch.common.io.stream.NamedWriteableRegistry;
4343
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
44+
import org.opensearch.common.util.FeatureFlags;
4445
import org.opensearch.common.xcontent.NamedXContentRegistry;
4546
import org.opensearch.index.mapper.BinaryFieldMapper;
4647
import org.opensearch.index.mapper.BooleanFieldMapper;
@@ -73,6 +74,7 @@
7374
import org.opensearch.index.shard.PrimaryReplicaSyncer;
7475
import org.opensearch.indices.cluster.IndicesClusterStateService;
7576
import org.opensearch.indices.mapper.MapperRegistry;
77+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
7678
import org.opensearch.indices.store.IndicesStore;
7779
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
7880
import org.opensearch.plugins.MapperPlugin;
@@ -278,6 +280,9 @@ protected void configure() {
278280
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
279281
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
280282
bind(RetentionLeaseSyncer.class).asEagerSingleton();
283+
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
284+
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
285+
}
281286
}
282287

283288
/**

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
140140
import org.opensearch.indices.recovery.RecoveryListener;
141141
import org.opensearch.indices.recovery.RecoveryState;
142+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
142143
import org.opensearch.node.Node;
143144
import org.opensearch.plugins.IndexStorePlugin;
144145
import org.opensearch.plugins.PluginsService;
@@ -843,6 +844,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
843844
@Override
844845
public IndexShard createShard(
845846
final ShardRouting shardRouting,
847+
final SegmentReplicationCheckpointPublisher checkpointPublisher,
846848
final PeerRecoveryTargetService recoveryTargetService,
847849
final RecoveryListener recoveryListener,
848850
final RepositoriesService repositoriesService,
@@ -857,7 +859,7 @@ public IndexShard createShard(
857859
IndexService indexService = indexService(shardRouting.index());
858860
assert indexService != null;
859861
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
860-
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
862+
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
861863
indexShard.addShardFailureCallback(onShardFailure);
862864
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
863865
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
8181
import org.opensearch.indices.recovery.RecoveryListener;
8282
import org.opensearch.indices.recovery.RecoveryState;
83+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
8384
import org.opensearch.indices.replication.common.ReplicationState;
8485
import org.opensearch.repositories.RepositoriesService;
8586
import org.opensearch.search.SearchService;
@@ -138,6 +139,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
138139
private final Consumer<ShardId> globalCheckpointSyncer;
139140
private final RetentionLeaseSyncer retentionLeaseSyncer;
140141

142+
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
143+
141144
@Inject
142145
public IndicesClusterStateService(
143146
final Settings settings,
@@ -153,13 +156,15 @@ public IndicesClusterStateService(
153156
final SnapshotShardsService snapshotShardsService,
154157
final PrimaryReplicaSyncer primaryReplicaSyncer,
155158
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
156-
final RetentionLeaseSyncer retentionLeaseSyncer
159+
final RetentionLeaseSyncer retentionLeaseSyncer,
160+
final SegmentReplicationCheckpointPublisher checkpointPublisher
157161
) {
158162
this(
159163
settings,
160164
indicesService,
161165
clusterService,
162166
threadPool,
167+
checkpointPublisher,
163168
recoveryTargetService,
164169
shardStateAction,
165170
nodeMappingRefreshAction,
@@ -179,6 +184,7 @@ public IndicesClusterStateService(
179184
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
180185
final ClusterService clusterService,
181186
final ThreadPool threadPool,
187+
final SegmentReplicationCheckpointPublisher checkpointPublisher,
182188
final PeerRecoveryTargetService recoveryTargetService,
183189
final ShardStateAction shardStateAction,
184190
final NodeMappingRefreshAction nodeMappingRefreshAction,
@@ -191,6 +197,7 @@ public IndicesClusterStateService(
191197
final RetentionLeaseSyncer retentionLeaseSyncer
192198
) {
193199
this.settings = settings;
200+
this.checkpointPublisher = checkpointPublisher;
194201
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
195202
this.indicesService = indicesService;
196203
this.clusterService = clusterService;
@@ -624,6 +631,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
624631
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
625632
indicesService.createShard(
626633
shardRouting,
634+
checkpointPublisher,
627635
recoveryTargetService,
628636
new RecoveryListener(shardRouting, primaryTerm, this),
629637
repositoriesService,
@@ -981,6 +989,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
981989
*/
982990
T createShard(
983991
ShardRouting shardRouting,
992+
SegmentReplicationCheckpointPublisher checkpointPublisher,
984993
PeerRecoveryTargetService recoveryTargetService,
985994
RecoveryListener recoveryListener,
986995
RepositoriesService repositoriesService,

0 commit comments

Comments
 (0)