Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,14 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
final var snapshotName = randomIdentifier();
final var snapshotFuture = startFullSnapshotBlockedOnDataNode(snapshotName, repoName, primaryNode);

final var updateSnapshotStatusBarrier = new CyclicBarrier(2);
final var updateSnapshotStatusRequestArrived = new CountDownLatch(1);
final var releaseUpdateSnapshotStatusRequests = new CountDownLatch(1);
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
masterTransportService.addRequestHandlingBehavior(
TransportUpdateSnapshotStatusAction.NAME,
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
safeAwait(updateSnapshotStatusBarrier);
safeAwait(updateSnapshotStatusBarrier);
updateSnapshotStatusRequestArrived.countDown();
safeAwait(releaseUpdateSnapshotStatusRequests);
try {
handler.messageReceived(request, channel, task);
} catch (Exception e) {
Expand All @@ -419,7 +420,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
addUnassignedShardsWatcher(clusterService, indexName);
putShutdownForRemovalMetadata(primaryNode, clusterService);
unblockAllDataNodes(repoName); // lets the shard snapshot pause, but allocation filtering stops it from moving
safeAwait(updateSnapshotStatusBarrier); // wait for data node to notify master that the shard snapshot is paused
safeAwait(updateSnapshotStatusRequestArrived); // wait for data node to notify master that the shard snapshot is paused (and any
// other updates to arrive)

// abort snapshot (and wait for the abort to land in the cluster state)
final var deleteStartedListener = ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> {
Expand All @@ -434,7 +436,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
final var deleteSnapshotFuture = startDeleteSnapshot(repoName, snapshotName); // abort the snapshot
safeAwait(deleteStartedListener);

safeAwait(updateSnapshotStatusBarrier); // process pause notification now that the snapshot is ABORTED
releaseUpdateSnapshotStatusRequests.countDown(); // release all blocked update_snapshot_status requests so they can complete
masterTransportService.clearAllRules(); // allow any further requests to use the real handler

assertEquals(SnapshotState.FAILED, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state());
assertTrue(deleteSnapshotFuture.get(10, TimeUnit.SECONDS).isAcknowledged());
Expand Down Expand Up @@ -466,6 +469,7 @@ public void testShutdownWhileSuccessInFlight() throws Exception {
SnapshotState.SUCCESS,
startFullSnapshot(repoName, randomIdentifier()).get(10, TimeUnit.SECONDS).getSnapshotInfo().state()
);
masterTransportService.clearAllRules();
clearShutdownMetadata(clusterService);
}

Expand Down Expand Up @@ -607,6 +611,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
masterTransportService.clearAllRules();

// Wait for the snapshot to fully pause.
safeAwait(snapshotPausedListener);
Expand Down