diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index 79e5563b5d9a8..c8c90832a8a5d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -400,13 +400,14 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception { final var snapshotName = randomIdentifier(); final var snapshotFuture = startFullSnapshotBlockedOnDataNode(snapshotName, repoName, primaryNode); - final var updateSnapshotStatusBarrier = new CyclicBarrier(2); + final var updateSnapshotStatusRequestArrived = new CountDownLatch(1); + final var releaseUpdateSnapshotStatusRequests = new CountDownLatch(1); final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); masterTransportService.addRequestHandlingBehavior( TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> { - safeAwait(updateSnapshotStatusBarrier); - safeAwait(updateSnapshotStatusBarrier); + updateSnapshotStatusRequestArrived.countDown(); + safeAwait(releaseUpdateSnapshotStatusRequests); try { handler.messageReceived(request, channel, task); } catch (Exception e) { @@ -419,7 +420,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception { addUnassignedShardsWatcher(clusterService, indexName); putShutdownForRemovalMetadata(primaryNode, clusterService); unblockAllDataNodes(repoName); // lets the shard snapshot pause, but allocation filtering stops it from moving - safeAwait(updateSnapshotStatusBarrier); // wait for data node to notify master that the shard snapshot is paused + safeAwait(updateSnapshotStatusRequestArrived); // wait for data node to notify master that the shard snapshot is paused (and any + // other updates to arrive) // abort snapshot (and wait for the abort to land in the cluster state) final var deleteStartedListener = ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> { @@ -434,7 +436,8 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception { final var deleteSnapshotFuture = startDeleteSnapshot(repoName, snapshotName); // abort the snapshot safeAwait(deleteStartedListener); - safeAwait(updateSnapshotStatusBarrier); // process pause notification now that the snapshot is ABORTED + releaseUpdateSnapshotStatusRequests.countDown(); // release all blocked update_snapshot_status requests so they can complete + masterTransportService.clearAllRules(); // allow any further requests to use the real handler assertEquals(SnapshotState.FAILED, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state()); assertTrue(deleteSnapshotFuture.get(10, TimeUnit.SECONDS).isAcknowledged()); @@ -466,6 +469,7 @@ public void testShutdownWhileSuccessInFlight() throws Exception { SnapshotState.SUCCESS, startFullSnapshot(repoName, randomIdentifier()).get(10, TimeUnit.SECONDS).getSnapshotInfo().state() ); + masterTransportService.clearAllRules(); clearShutdownMetadata(clusterService); } @@ -607,6 +611,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception { // Release the master node to respond snapshotStatusUpdateLatch.countDown(); + masterTransportService.clearAllRules(); // Wait for the snapshot to fully pause. safeAwait(snapshotPausedListener);