diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index b8618bb0ae801..bf3a478436168 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -574,6 +574,19 @@ public void applyClusterState(ClusterChangedEvent event) { } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); } + assert assertConsistentWithClusterState(event.state()); + } + + private boolean assertConsistentWithClusterState(ClusterState state) { + final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) { + final Set runningSnapshots = + snapshotsInProgress.entries().stream().map(SnapshotsInProgress.Entry::snapshot).collect(Collectors.toSet()); + final Set snapshotListenerKeys = snapshotCompletionListeners.keySet(); + assert runningSnapshots.containsAll(snapshotListenerKeys) : "Saw completion listeners for unknown snapshots in " + + snapshotListenerKeys + " but running snapshots are " + runningSnapshots; + } + return true; } /** diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index b1f985d9d7583..16f1f37697095 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -96,6 +96,7 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; @@ -407,6 +408,49 @@ public void testSnapshotWithNodeDisconnects() { assertThat(snapshotIds, hasSize(1)); } + public void testSnapshotDeleteWithMasterFailover() { + final int dataNodes = randomIntBetween(2, 10); + final int masterNodes = randomFrom(3, 5); + setupTestCluster(masterNodes, dataNodes); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + final boolean waitForSnapshot = randomBoolean(); + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> + testClusterNodes.randomMasterNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(waitForSnapshot).execute(createSnapshotResponseStepListener)); + + final AtomicBoolean snapshotDeleteResponded = new AtomicBoolean(false); + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { + scheduleNow(this::disconnectOrRestartMasterNode); + testClusterNodes.randomDataNodeSafe().client.admin().cluster() + .prepareDeleteSnapshot(repoName, snapshotName).execute(ActionListener.wrap(() -> snapshotDeleteResponded.set(true))); + }); + + runUntil(() -> testClusterNodes.randomMasterNode().map(master -> { + if (snapshotDeleteResponded.get() == false) { + return false; + } + final SnapshotDeletionsInProgress snapshotDeletionsInProgress = + master.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE); + return snapshotDeletionsInProgress == null || snapshotDeletionsInProgress.getEntries().isEmpty(); + }).orElse(false), TimeUnit.MINUTES.toMillis(1L)); + + clearDisruptionsAndAwaitSync(); + + final TestClusterNodes.TestClusterNode randomMaster = testClusterNodes.randomMasterNode() + .orElseThrow(() -> new AssertionError("expected to find at least one active master node")); + SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertThat(finalSnapshotsInProgress.entries(), empty()); + final Repository repository = randomMaster.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(0)); + } + public void testConcurrentSnapshotCreateAndDelete() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); @@ -1149,6 +1193,8 @@ private final class TestClusterNode { private final ClusterService clusterService; + private final NodeConnectionsService nodeConnectionsService; + private final RepositoriesService repositoriesService; private final SnapshotsService snapshotsService; @@ -1300,6 +1346,8 @@ public void onFailure(final Exception e) { new BatchedRerouteService(clusterService, allocationService::reroute), threadPool ); + nodeConnectionsService = + new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService); @SuppressWarnings("rawtypes") Map actions = new HashMap<>(); actions.put(GlobalCheckpointSyncAction.TYPE, @@ -1453,6 +1501,7 @@ public void stop() { testClusterNodes.disconnectNode(this); indicesService.close(); clusterService.close(); + nodeConnectionsService.stop(); indicesClusterStateService.close(); if (coordinator != null) { coordinator.close(); @@ -1477,10 +1526,9 @@ public void start(ClusterState initialState) { new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE); masterService.setClusterStatePublisher(coordinator); coordinator.start(); - masterService.start(); - clusterService.getClusterApplierService().setNodeConnectionsService( - new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)); - clusterService.getClusterApplierService().start(); + clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService); + nodeConnectionsService.start(); + clusterService.start(); indicesService.start(); indicesClusterStateService.start(); coordinator.startInitialJoin();