9696import org .elasticsearch .cluster .ClusterStateListener ;
9797import org .elasticsearch .cluster .ESAllocationTestCase ;
9898import org .elasticsearch .cluster .NodeConnectionsService ;
99+ import org .elasticsearch .cluster .SnapshotDeletionsInProgress ;
99100import org .elasticsearch .cluster .SnapshotsInProgress ;
100101import org .elasticsearch .cluster .action .index .MappingUpdatedAction ;
101102import org .elasticsearch .cluster .action .index .NodeMappingRefreshAction ;
@@ -407,6 +408,49 @@ public void testSnapshotWithNodeDisconnects() {
407408 assertThat (snapshotIds , hasSize (1 ));
408409 }
409410
411+ public void testSnapshotDeleteWithMasterFailover () {
412+ final int dataNodes = randomIntBetween (2 , 10 );
413+ final int masterNodes = randomFrom (3 , 5 );
414+ setupTestCluster (masterNodes , dataNodes );
415+
416+ String repoName = "repo" ;
417+ String snapshotName = "snapshot" ;
418+ final String index = "test" ;
419+ final int shards = randomIntBetween (1 , 10 );
420+
421+ final boolean waitForSnapshot = randomBoolean ();
422+ final StepListener <CreateSnapshotResponse > createSnapshotResponseStepListener = new StepListener <>();
423+ continueOrDie (createRepoAndIndex (repoName , index , shards ), createIndexResponse ->
424+ testClusterNodes .randomMasterNodeSafe ().client .admin ().cluster ().prepareCreateSnapshot (repoName , snapshotName )
425+ .setWaitForCompletion (waitForSnapshot ).execute (createSnapshotResponseStepListener ));
426+
427+ final AtomicBoolean snapshotDeleteResponded = new AtomicBoolean (false );
428+ continueOrDie (createSnapshotResponseStepListener , createSnapshotResponse -> {
429+ scheduleNow (this ::disconnectOrRestartMasterNode );
430+ testClusterNodes .randomDataNodeSafe ().client .admin ().cluster ()
431+ .prepareDeleteSnapshot (repoName , snapshotName ).execute (ActionListener .wrap (() -> snapshotDeleteResponded .set (true )));
432+ });
433+
434+ runUntil (() -> testClusterNodes .randomMasterNode ().map (master -> {
435+ if (snapshotDeleteResponded .get () == false ) {
436+ return false ;
437+ }
438+ final SnapshotDeletionsInProgress snapshotDeletionsInProgress =
439+ master .clusterService .state ().custom (SnapshotDeletionsInProgress .TYPE );
440+ return snapshotDeletionsInProgress == null || snapshotDeletionsInProgress .getEntries ().isEmpty ();
441+ }).orElse (false ), TimeUnit .MINUTES .toMillis (1L ));
442+
443+ clearDisruptionsAndAwaitSync ();
444+
445+ final TestClusterNodes .TestClusterNode randomMaster = testClusterNodes .randomMasterNode ()
446+ .orElseThrow (() -> new AssertionError ("expected to find at least one active master node" ));
447+ SnapshotsInProgress finalSnapshotsInProgress = randomMaster .clusterService .state ().custom (SnapshotsInProgress .TYPE );
448+ assertThat (finalSnapshotsInProgress .entries (), empty ());
449+ final Repository repository = randomMaster .repositoriesService .repository (repoName );
450+ Collection <SnapshotId > snapshotIds = getRepositoryData (repository ).getSnapshotIds ();
451+ assertThat (snapshotIds , hasSize (0 ));
452+ }
453+
410454 public void testConcurrentSnapshotCreateAndDelete () {
411455 setupTestCluster (randomFrom (1 , 3 , 5 ), randomIntBetween (2 , 10 ));
412456
@@ -1149,6 +1193,8 @@ private final class TestClusterNode {
11491193
11501194 private final ClusterService clusterService ;
11511195
1196+ private final NodeConnectionsService nodeConnectionsService ;
1197+
11521198 private final RepositoriesService repositoriesService ;
11531199
11541200 private final SnapshotsService snapshotsService ;
@@ -1300,6 +1346,8 @@ public void onFailure(final Exception e) {
13001346 new BatchedRerouteService (clusterService , allocationService ::reroute ),
13011347 threadPool
13021348 );
1349+ nodeConnectionsService =
1350+ new NodeConnectionsService (clusterService .getSettings (), threadPool , transportService );
13031351 @ SuppressWarnings ("rawtypes" )
13041352 Map <ActionType , TransportAction > actions = new HashMap <>();
13051353 actions .put (GlobalCheckpointSyncAction .TYPE ,
@@ -1453,6 +1501,7 @@ public void stop() {
14531501 testClusterNodes .disconnectNode (this );
14541502 indicesService .close ();
14551503 clusterService .close ();
1504+ nodeConnectionsService .stop ();
14561505 indicesClusterStateService .close ();
14571506 if (coordinator != null ) {
14581507 coordinator .close ();
@@ -1477,10 +1526,9 @@ public void start(ClusterState initialState) {
14771526 new BatchedRerouteService (clusterService , allocationService ::reroute ), ElectionStrategy .DEFAULT_INSTANCE );
14781527 masterService .setClusterStatePublisher (coordinator );
14791528 coordinator .start ();
1480- masterService .start ();
1481- clusterService .getClusterApplierService ().setNodeConnectionsService (
1482- new NodeConnectionsService (clusterService .getSettings (), threadPool , transportService ));
1483- clusterService .getClusterApplierService ().start ();
1529+ clusterService .getClusterApplierService ().setNodeConnectionsService (nodeConnectionsService );
1530+ nodeConnectionsService .start ();
1531+ clusterService .start ();
14841532 indicesService .start ();
14851533 indicesClusterStateService .start ();
14861534 coordinator .startInitialJoin ();
0 commit comments