Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,14 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
final var snapshotName = randomIdentifier();
final var snapshotFuture = startFullSnapshotBlockedOnDataNode(snapshotName, repoName, primaryNode);

final var updateSnapshotStatusBarrier = new CyclicBarrier(2);
final var updateSnapshotStatusRequestArrived = new CountDownLatch(1);
final var releaseUpdateSnapshotStatusRequests = new CountDownLatch(1);
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
masterTransportService.addRequestHandlingBehavior(
TransportUpdateSnapshotStatusAction.NAME,
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
safeAwait(updateSnapshotStatusBarrier);
safeAwait(updateSnapshotStatusBarrier);
updateSnapshotStatusRequestArrived.countDown();
safeAwait(releaseUpdateSnapshotStatusRequests);
try {
handler.messageReceived(request, channel, task);
} catch (Exception e) {
Expand All @@ -425,7 +426,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
addUnassignedShardsWatcher(clusterService, indexName);
putShutdownForRemovalMetadata(primaryNode, clusterService);
unblockAllDataNodes(repoName); // lets the shard snapshot pause, but allocation filtering stops it from moving
safeAwait(updateSnapshotStatusBarrier); // wait for data node to notify master that the shard snapshot is paused
safeAwait(updateSnapshotStatusRequestArrived); // wait for data node to notify master that the shard snapshot is paused (and any
// other updates to arrive)

// abort snapshot (and wait for the abort to land in the cluster state)
final var deleteStartedListener = ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> {
Expand All @@ -440,7 +442,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
final var deleteSnapshotFuture = startDeleteSnapshot(repoName, snapshotName); // abort the snapshot
safeAwait(deleteStartedListener);

safeAwait(updateSnapshotStatusBarrier); // process pause notification now that the snapshot is ABORTED
releaseUpdateSnapshotStatusRequests.countDown(); // release all blocked update_snapshot_status requests so they can complete
masterTransportService.clearAllRules(); // allow any further requests to use the real handler

assertEquals(SnapshotState.FAILED, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state());
assertTrue(deleteSnapshotFuture.get(10, TimeUnit.SECONDS).isAcknowledged());
Expand Down Expand Up @@ -472,6 +475,7 @@ public void testShutdownWhileSuccessInFlight() throws Exception {
SnapshotState.SUCCESS,
startFullSnapshot(repoName, randomIdentifier()).get(10, TimeUnit.SECONDS).getSnapshotInfo().state()
);
masterTransportService.clearAllRules();
clearShutdownMetadata(clusterService);
}

Expand Down Expand Up @@ -613,6 +617,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
masterTransportService.clearAllRules();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lmk if this is the wrong place to have this. I put it here since it's the line following snapshotStatusUpdateLatch being counted down (and therefore the update snapshot requests are allowed to be processed, and we can remove the rules).


// Wait for the snapshot to fully pause.
safeAwait(snapshotPausedListener);
Expand Down