Skip to content

Commit 9a6def5

Browse files
mch2shiv0408
authored andcommitted
Fix SegmentReplicationIT.testReplicaHasDiffFilesThanPrimary for node-node replication (opensearch-project#8912)
* Fix SegmentReplicationIT.testReplicahasDiffFilesThanPrimary This test is now failing for node-node replication. On the primary shard the prepareSegmentReplication method should cancel any ongoing replication if it is running and start a new sync. Thisis incorrectly using Map.compute which will not replace the existing handler entry in the allocationIdToHandlers map. It will only cancel the existing source handler. As a result this can leave the copyState map with an entry and hold an index commit while the test is cleaning up. The copyState is only cleared when a handler is cancelled directly or from a cluster state update. Signed-off-by: Marc Handalian <[email protected]> * PR feedback. Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]> Signed-off-by: Shivansh Arora <[email protected]>
1 parent 371bc76 commit 9a6def5

File tree

2 files changed

+53
-7
lines changed

2 files changed

+53
-7
lines changed

server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,25 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
139139
*/
140140
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
141141
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
142-
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
143-
if (segrepHandler != null) {
144-
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
145-
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
146-
}
147-
return createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter);
148-
});
142+
final SegmentReplicationSourceHandler newHandler = createTargetHandler(
143+
request.getTargetNode(),
144+
copyState,
145+
request.getTargetAllocationId(),
146+
fileChunkWriter
147+
);
148+
final SegmentReplicationSourceHandler existingHandler = allocationIdToHandlers.putIfAbsent(
149+
request.getTargetAllocationId(),
150+
newHandler
151+
);
152+
// If we are already replicating to this allocation Id, cancel the old and replace with a new execution.
153+
// This will clear the old handler & referenced copy state holding an incref'd indexCommit.
154+
if (existingHandler != null) {
155+
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
156+
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
157+
assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false;
158+
allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler);
159+
}
160+
assert allocationIdToHandlers.containsKey(request.getTargetAllocationId());
149161
return copyState;
150162
}
151163

server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,4 +403,38 @@ public void testCancelForMissingIds() throws IOException {
403403
assertEquals(0, replications.cachedCopyStateSize());
404404
closeShards(replica_2);
405405
}
406+
407+
public void testPrepareForReplicationAlreadyReplicating() throws IOException {
408+
OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
409+
final String replicaAllocationId = replica.routingEntry().allocationId().getId();
410+
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint);
411+
412+
final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
413+
414+
final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId);
415+
assertEquals(handler.getCopyState(), copyState);
416+
assertEquals(1, copyState.refCount());
417+
418+
ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
419+
testCheckpoint.getShardId(),
420+
testCheckpoint.getPrimaryTerm(),
421+
testCheckpoint.getSegmentsGen(),
422+
testCheckpoint.getSegmentInfosVersion() + 1,
423+
testCheckpoint.getCodec()
424+
);
425+
426+
final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
427+
1L,
428+
replicaAllocationId,
429+
primaryDiscoveryNode,
430+
secondCheckpoint
431+
);
432+
433+
final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));
434+
final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId);
435+
assertEquals(secondHandler.getCopyState(), secondCopyState);
436+
assertEquals("New copy state is incref'd", 1, secondCopyState.refCount());
437+
assertEquals("Old copy state is cleaned up", 0, copyState.refCount());
438+
439+
}
406440
}

0 commit comments

Comments
 (0)