Skip to content

Commit 6c75f6d

Browse files
committed
Adding test for null checkpoint publisher and addreesing PR comments
Signed-off-by: Rishikesh1159 <[email protected]>
1 parent bd17e4f commit 6c75f6d

File tree

7 files changed

+22
-19
lines changed

7 files changed

+22
-19
lines changed

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,9 +533,7 @@ public synchronized IndexShard createShard(
533533
() -> globalCheckpointSyncer.accept(shardId),
534534
retentionLeaseSyncer,
535535
circuitBreakerService,
536-
this.indexSettings.isSegRepEnabled() && routing.primary()
537-
? checkpointPublisher
538-
: SegmentReplicationCheckpointPublisher.EMPTY
536+
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
539537
);
540538
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
541539
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@
161161
import org.opensearch.indices.recovery.RecoveryTarget;
162162
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
163163
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
164-
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
164+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
165165
import org.opensearch.repositories.RepositoriesService;
166166
import org.opensearch.repositories.Repository;
167167
import org.opensearch.rest.RestStatus;
@@ -324,7 +324,7 @@ public IndexShard(
324324
final Runnable globalCheckpointSyncer,
325325
final RetentionLeaseSyncer retentionLeaseSyncer,
326326
final CircuitBreakerService circuitBreakerService,
327-
final SegmentReplicationCheckpointPublisher checkpointPublisher
327+
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
328328
) throws IOException {
329329
super(shardRouting.shardId(), indexSettings);
330330
assert shardRouting.initializing();
@@ -407,7 +407,7 @@ public boolean shouldCache(Query query) {
407407
persistMetadata(path, indexSettings, shardRouting, null, logger);
408408
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
409409
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
410-
if (indexSettings.isSegRepEnabled() == true) {
410+
if (checkpointPublisher != null) {
411411
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
412412
} else {
413413
this.checkpointRefreshListener = null;
@@ -1372,10 +1372,16 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
13721372
}
13731373
}
13741374

1375+
/**
1376+
* Returns the lastest Replication Checkpoint that shard received
1377+
*/
13751378
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
13761379
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
13771380
}
13781381

1382+
/**
1383+
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
1384+
*/
13791385
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
13801386
assert shardRouting.primary() == false;
13811387
// TODO

server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.action.support.replication.ReplicationRequest;
1212
import org.opensearch.common.io.stream.StreamInput;
1313
import org.opensearch.common.io.stream.StreamOutput;
14-
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
1514

1615
import java.io.IOException;
1716

server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java renamed to server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.indices.replication.copy;
9+
package org.opensearch.indices.replication.checkpoint;
1010

1111
import org.opensearch.common.Nullable;
1212
import org.opensearch.common.io.stream.StreamInput;

server/src/main/java/org/opensearch/indices/replication/copy/package-info.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3440,6 +3440,17 @@ public void testCheckpointRefreshListener() throws IOException {
34403440
closeShards(shard);
34413441
}
34423442

3443+
/**
3444+
* here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List
3445+
*/
3446+
public void testCheckpointRefreshListenerWithNull() throws IOException {
3447+
IndexShard shard = newStartedShard(p -> newShard(null), true);
3448+
shard.refresh("test");
3449+
List<ReferenceManager.RefreshListener> refreshListeners = shard.getEngine().config().getInternalRefreshListener();
3450+
assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener));
3451+
closeShards(shard);
3452+
}
3453+
34433454
/**
34443455
* creates a new initializing shard. The shard will will be put in its proper path under the
34453456
* current node id the shard is assigned to.

server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.opensearch.index.shard.ShardId;
2424
import org.opensearch.indices.IndicesService;
2525
import org.opensearch.indices.recovery.RecoverySettings;
26-
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
2726
import org.opensearch.test.OpenSearchTestCase;
2827
import org.opensearch.test.transport.CapturingTransport;
2928
import org.opensearch.threadpool.TestThreadPool;

0 commit comments

Comments
 (0)