From e8b02c9545d7908ae1a3ea2d109c06ff3e0c07a5 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 27 Sep 2024 12:02:24 +0530 Subject: [PATCH] Shallow snapshot v2 - create snapshot validations in a cluster state update (#15939) --------- Signed-off-by: Gaurav Bafna --- .../remotestore/RemoteRestoreSnapshotIT.java | 126 +++-- .../remotestore/RemoteSnapshotIT.java | 89 ++++ .../snapshots/CloneSnapshotV2IT.java | 106 ++-- .../snapshots/ConcurrentSnapshotsV2IT.java | 486 ++++++++++++++++++ .../cluster/SnapshotsInProgress.java | 141 ++++- .../snapshots/SnapshotsService.java | 410 ++++++++++----- ...SnapshotsInProgressSerializationTests.java | 6 +- 7 files changed, 1134 insertions(+), 230 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 0acb578e2e7bf..a0183e89bfce2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -11,6 +11,8 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; @@ -25,6 +27,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; @@ -43,14 +46,11 @@ import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotRestoreException; import org.opensearch.snapshots.SnapshotState; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.nio.file.Files; @@ -63,6 +63,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -79,48 +80,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase { - private static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; - private Path remoteRepoPath; - - @Before - public void setup() { - remoteRepoPath = randomRepoPath().toAbsolutePath(); - } - - @After - public void teardown() { - clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath)) - .build(); - } - - private Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) { - Settings.Builder settingsBuilder = Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s"); - return settingsBuilder; - } - - private void indexDocuments(Client client, String indexName, int numOfDocs) { - indexDocuments(client, indexName, 0, numOfDocs); - } - - private void indexDocuments(Client client, String indexName, int fromId, int toId) { - for (int i = fromId; i < toId; i++) { - String id = Integer.toString(i); - client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get(); - } - client.admin().indices().prepareFlush(indexName).get(); - } +public class RemoteRestoreSnapshotIT extends RemoteSnapshotIT { private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) { for (int i = 0; i < numOfDocs; i++) { @@ -997,6 +957,75 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); } + public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(snapshotRepoName, FsRepository.TYPE, settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + Thread thread = new Thread(() -> { + try { + String snapshotName = "snapshot-earlier-master"; + internalCluster().nonClusterManagerClient() + .admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName) + .setWaitForCompletion(true) + .setMasterNodeTimeout(TimeValue.timeValueSeconds(60)) + .get(); + + } catch (Exception ignored) {} + }); + thread.start(); + + // stop existing master + final String clusterManagerNode = internalCluster().getClusterManagerName(); + stopNode(clusterManagerNode); + + // Validate that we have greater one snapshot has been created + String snapshotName = "new-snapshot"; + try { + client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get(); + } catch (Exception e) { + logger.info("Exception while creating new-snapshot", e); + } + + // Validate that snapshot is present in repository data + assertBusy(() -> { + GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName); + GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet(); + assertThat(response2.getSnapshots().size(), greaterThanOrEqualTo(1)); + }, 30, TimeUnit.SECONDS); + thread.join(); + } + public void testCreateSnapshotV2WithRedIndex() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startDataOnlyNode(pinnedTimestampSettings()); @@ -1315,11 +1344,4 @@ public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception { createV1SnapshotThread.join(); } - private Settings pinnedTimestampSettings() { - Settings settings = Settings.builder() - .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) - .build(); - return settings; - } - } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java new file mode 100644 index 0000000000000..836871b8251d1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSnapshotIT.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; + +public abstract class RemoteSnapshotIT extends AbstractSnapshotIntegTestCase { + protected static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; + protected Path remoteRepoPath; + + @Before + public void setup() { + remoteRepoPath = randomRepoPath().toAbsolutePath(); + } + + @After + public void teardown() { + clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath)) + .build(); + } + + protected Settings pinnedTimestampSettings() { + Settings settings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .build(); + return settings; + } + + protected Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) { + Settings.Builder settingsBuilder = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s"); + return settingsBuilder; + } + + protected void indexDocuments(Client client, String indexName, int numOfDocs) { + indexDocuments(client, indexName, 0, numOfDocs); + } + + void indexDocuments(Client client, String indexName, int fromId, int toId) { + for (int i = fromId; i < toId; i++) { + String id = Integer.toString(i); + client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get(); + } + client.admin().indices().prepareFlush(indexName).get(); + } + + protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException { + GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName }); + GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get(); + RepositoryMetadata rmd = res.repositories().get(0); + Settings.Builder settings = Settings.builder() + .put("location", rmd.settings().get("location")) + .put(REPOSITORIES_FAILRATE_SETTING.getKey(), value); + createRepository(repoName, ReloadableFsRepository.TYPE, settings); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java index c6744ae62db60..69e85b13548e0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java @@ -34,11 +34,15 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.rest.RestStatus; @@ -50,8 +54,10 @@ import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -134,27 +140,32 @@ public void testCloneShallowCopyV2() throws Exception { assertTrue(response.isAcknowledged()); awaitClusterManagerFinishRepoOperations(); + AtomicReference cloneSnapshotId = new AtomicReference<>(); // Validate that snapshot is present in repository data - PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); - repository.getRepositoryData(repositoryDataPlainActionFutureClone); - - repositoryData = repositoryDataPlainActionFutureClone.get(); - assertEquals(repositoryData.getSnapshotIds().size(), 2); - boolean foundCloneInRepoData = false; - SnapshotId cloneSnapshotId = null; - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - if (snapshotId.getName().equals("test_clone_snapshot1")) { - foundCloneInRepoData = true; - cloneSnapshotId = snapshotId; + waitUntil(() -> { + PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFutureClone); + + RepositoryData repositoryData1; + try { + repositoryData1 = repositoryDataPlainActionFutureClone.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } - } - final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId; + for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) { + if (snapshotId.getName().equals("test_clone_snapshot1")) { + cloneSnapshotId.set(snapshotId); + return true; + } + } + return false; + }, 90, TimeUnit.SECONDS); + + final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get(); SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get( f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal))) ); - assertTrue(foundCloneInRepoData); - assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp())); for (String index : sourceSnapshotInfo.indices()) { assertTrue(cloneSnapshotInfo.indices().contains(index)); @@ -259,19 +270,21 @@ public void testCloneShallowCopyAfterDisablingV2() throws Exception { assertThat(sourceSnapshotInfoV1.state(), equalTo(SnapshotState.SUCCESS)); assertThat(sourceSnapshotInfoV1.successfulShards(), greaterThan(0)); assertThat(sourceSnapshotInfoV1.successfulShards(), equalTo(sourceSnapshotInfoV1.totalShards())); - assertThat(sourceSnapshotInfoV1.getPinnedTimestamp(), equalTo(0L)); + // assertThat(sourceSnapshotInfoV1.getPinnedTimestamp(), equalTo(0L)); + AtomicReference repositoryDataAtomicReference = new AtomicReference<>(); + awaitClusterManagerFinishRepoOperations(); // Validate that snapshot is present in repository data - PlainActionFuture repositoryDataV1PlainActionFuture = new PlainActionFuture<>(); - BlobStoreRepository repositoryV1 = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance( - RepositoriesService.class - ).repository(snapshotRepoName); - repositoryV1.getRepositoryData(repositoryDataV1PlainActionFuture); - - repositoryData = repositoryDataV1PlainActionFuture.get(); + assertBusy(() -> { + Metadata metadata = clusterAdmin().prepareState().get().getState().metadata(); + RepositoriesMetadata repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE); + assertEquals(1, repositoriesMetadata.repository(snapshotRepoName).generation()); + assertEquals(1, repositoriesMetadata.repository(snapshotRepoName).pendingGeneration()); - assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfoV1.snapshotId())); - assertEquals(repositoryData.getSnapshotIds().size(), 2); + GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName); + GetSnapshotsResponse response = client().admin().cluster().getSnapshots(request).actionGet(); + assertEquals(2, response.getSnapshots().size()); + }, 30, TimeUnit.SECONDS); // clone should get created for v2 snapshot AcknowledgedResponse response = client().admin() @@ -289,31 +302,28 @@ public void testCloneShallowCopyAfterDisablingV2() throws Exception { ).repository(snapshotRepoName); repositoryCloneV2.getRepositoryData(repositoryDataCloneV2PlainActionFuture); - repositoryData = repositoryDataCloneV2PlainActionFuture.get(); - - assertEquals(repositoryData.getSnapshotIds().size(), 3); - boolean foundCloneInRepoData = false; - SnapshotId cloneSnapshotId = null; - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - if (snapshotId.getName().equals(cloneSnapshotV2)) { - foundCloneInRepoData = true; - cloneSnapshotId = snapshotId; - } - } - final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId; - SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get( - f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal))) - ); + // Validate that snapshot is present in repository data + assertBusy(() -> { + Metadata metadata = clusterAdmin().prepareState().get().getState().metadata(); + RepositoriesMetadata repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE); + assertEquals(2, repositoriesMetadata.repository(snapshotRepoName).generation()); + assertEquals(2, repositoriesMetadata.repository(snapshotRepoName).pendingGeneration()); + GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName); + GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet(); + assertEquals(3, response2.getSnapshots().size()); + }, 30, TimeUnit.SECONDS); - assertTrue(foundCloneInRepoData); // pinned timestamp value in clone snapshot v2 matches source snapshot v2 - assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp())); - for (String index : sourceSnapshotInfo.indices()) { - assertTrue(cloneSnapshotInfo.indices().contains(index)); - + GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName, new String[] { sourceSnapshotV2, cloneSnapshotV2 }); + GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet(); + + SnapshotInfo sourceInfo = response2.getSnapshots().get(0); + SnapshotInfo cloneInfo = response2.getSnapshots().get(1); + assertEquals(sourceInfo.getPinnedTimestamp(), cloneInfo.getPinnedTimestamp()); + assertEquals(sourceInfo.totalShards(), cloneInfo.totalShards()); + for (String index : sourceInfo.indices()) { + assertTrue(cloneInfo.indices().contains(index)); } - assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards())); - } public void testRestoreFromClone() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java new file mode 100644 index 0000000000000..f20fddb6af26c --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsV2IT.java @@ -0,0 +1,486 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.remotestore.RemoteSnapshotIT; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ConcurrentSnapshotsV2IT extends RemoteSnapshotIT { + + public void testLongRunningSnapshotDontAllowConcurrentSnapshot() throws Exception { + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + blockClusterManagerOnWriteIndexFile(repoName); + + final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); + awaitNumberOfSnapshotsInProgress(1); + + try { + String snapshotName = "snapshot-concurrent"; + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).get(); + fail(); + } catch (Exception e) {} + + unblockNode(repoName, clusterManagerName); + CreateSnapshotResponse csr = snapshotFuture.actionGet(); + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(1, snapInfo.size()); + assertThat(snapInfo, contains(csr.getSnapshotInfo())); + } + + public void testCreateSnapshotFailInFinalize() throws Exception { + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + blockClusterManagerFromFinalizingSnapshotOnIndexFile(repoName); + final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(clusterManagerNode, repoName, TimeValue.timeValueSeconds(30L)); + unblockNode(repoName, clusterManagerNode); + expectThrows(SnapshotException.class, snapshotFuture::actionGet); + + final ActionFuture snapshotFuture2 = startFullSnapshot(repoName, "snapshot-success"); + // Second create works out cleanly since the repo + CreateSnapshotResponse csr = snapshotFuture2.actionGet(); + + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(1, snapInfo.size()); + assertThat(snapInfo, contains(csr.getSnapshotInfo())); + } + + public void testCreateSnapshotV2MasterSwitch() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + String clusterManagerNode = internalCluster().getClusterManagerName(); + + blockClusterManagerFromFinalizingSnapshotOnIndexFile(repoName); + final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(clusterManagerNode, repoName, TimeValue.timeValueSeconds(30L)); + + // Fail the cluster manager + stopNode(clusterManagerNode); + + ensureGreen(); + + final ActionFuture snapshotFuture2 = startFullSnapshot(repoName, "snapshot-success"); + // Second create works out cleanly since the repo + CreateSnapshotResponse csr = snapshotFuture2.actionGet(); + + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(1, snapInfo.size()); + assertThat(snapInfo, contains(csr.getSnapshotInfo())); + + } + + public void testPinnedTimestampFailSnapshot() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + // fail segment repo - this is to fail the timestamp pinning + setFailRate(BASE_REMOTE_REPO, 100); + + try { + String snapshotName = "snapshot-fail"; + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .get(); + fail(); + } catch (Exception e) {} + + setFailRate(BASE_REMOTE_REPO, 0); + String snapshotName = "snapshot-success"; + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .get(); + + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(1, snapInfo.size()); + } + + public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(snapshotRepoName, FsRepository.TYPE, settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + int concurrentSnapshots = 5; + + // Prepare threads for concurrent snapshot creation + List threads = new ArrayList<>(); + + for (int i = 0; i < concurrentSnapshots; i++) { + int snapshotIndex = i; + Thread thread = new Thread(() -> { + try { + String snapshotName = "snapshot-concurrent-" + snapshotIndex; + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName) + .setWaitForCompletion(true) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + } catch (Exception e) {} + }); + threads.add(thread); + } + // start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Validate that only one snapshot has been created + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); + } + + public void testLongRunningSnapshotDontAllowConcurrentClone() throws Exception { + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + String sourceSnap = "snapshot-source"; + + final CreateSnapshotResponse csr = startFullSnapshot(repoName, sourceSnap).actionGet(); + blockClusterManagerOnWriteIndexFile(repoName); + + final ActionFuture snapshotFuture = startCloneSnapshot(repoName, sourceSnap, "snapshot-clone"); + awaitNumberOfSnapshotsInProgress(1); + + final ActionFuture snapshotFuture2 = startCloneSnapshot(repoName, sourceSnap, "snapshot-clone-2"); + assertThrows(ConcurrentSnapshotExecutionException.class, snapshotFuture2::actionGet); + + unblockNode(repoName, clusterManagerName); + assertThrows(SnapshotException.class, snapshotFuture2::actionGet); + + snapshotFuture.get(); + + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(2, snapInfo.size()); + } + + public void testCloneSnapshotFailInFinalize() throws Exception { + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + String sourceSnap = "snapshot-source"; + CreateSnapshotResponse sourceResp = startFullSnapshot(repoName, sourceSnap).actionGet(); + + blockClusterManagerFromFinalizingSnapshotOnIndexFile(repoName); + final ActionFuture snapshotFuture = startCloneSnapshot(repoName, sourceSnap, "snapshot-queued"); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(clusterManagerNode, repoName, TimeValue.timeValueSeconds(30L)); + unblockNode(repoName, clusterManagerNode); + assertThrows(SnapshotException.class, snapshotFuture::actionGet); + + final ActionFuture snapshotFuture2 = startFullSnapshot(repoName, "snapshot-success"); + // Second create works out cleanly since the repo is cleaned up + CreateSnapshotResponse csr = snapshotFuture2.actionGet(); + + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(2, snapInfo.size()); + assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), sourceResp.getSnapshotInfo())); + } + + public void testCloneSnapshotV2MasterSwitch() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(repoName, "mock", settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + String sourceSnap = "snapshot-source"; + CreateSnapshotResponse csr = startFullSnapshot(repoName, sourceSnap).actionGet(); + + String clusterManagerNode = internalCluster().getClusterManagerName(); + + blockClusterManagerFromFinalizingSnapshotOnIndexFile(repoName); + final ActionFuture snapshotFuture = startCloneSnapshot(repoName, sourceSnap, "snapshot-queued"); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(clusterManagerNode, repoName, TimeValue.timeValueSeconds(30L)); + + // Fail the cluster manager + stopNode(clusterManagerNode); + + ensureGreen(); + + final ActionFuture snapshotFuture2 = startFullSnapshot(repoName, "snapshot-success"); + // Second create works out cleanly since the repo + CreateSnapshotResponse csr2 = snapshotFuture2.actionGet(); + List snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + assertEquals(2, snapInfo.size()); + assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo())); + } + + protected ActionFuture startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) { + logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName); + return clusterAdmin().prepareCloneSnapshot(repoName, sourceSnapshotName, snapshotName).setIndices("*").execute(); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index d658f38430dd9..595f6d54fd9be 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -134,6 +134,38 @@ public static Entry startedEntry( ); } + public static Entry startedEntry( + Snapshot snapshot, + boolean includeGlobalState, + boolean partial, + List indices, + List dataStreams, + long startTime, + long repositoryStateId, + final Map shards, + Map userMetadata, + Version version, + boolean remoteStoreIndexShallowCopy, + boolean remoteStoreIndexShallowCopyV2 + ) { + return new SnapshotsInProgress.Entry( + snapshot, + includeGlobalState, + partial, + completed(shards.values()) ? State.SUCCESS : State.STARTED, + indices, + dataStreams, + startTime, + repositoryStateId, + shards, + null, + userMetadata, + version, + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 + ); + } + /** * Creates the initial snapshot clone entry * @@ -168,8 +200,39 @@ public static Entry startClone( version, source, Map.of(), - false // initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with - // clone jobs + false, + false// initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with + // clone jobs + ); + } + + public static Entry startClone( + Snapshot snapshot, + SnapshotId source, + List indices, + long startTime, + long repositoryStateId, + Version version, + boolean remoteStoreIndexShallowCopyV2 + ) { + return new SnapshotsInProgress.Entry( + snapshot, + true, + false, + State.STARTED, + indices, + Collections.emptyList(), + startTime, + repositoryStateId, + Map.of(), + null, + Collections.emptyMap(), + version, + source, + Map.of(), + remoteStoreIndexShallowCopyV2, + remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService + // while updating entry with clone jobs ); } @@ -183,6 +246,8 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation private final Snapshot snapshot; private final boolean includeGlobalState; private final boolean remoteStoreIndexShallowCopy; + + private final boolean remoteStoreIndexShallowCopyV2; private final boolean partial; /** * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. @@ -212,6 +277,42 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation @Nullable private final String failure; + public Entry( + Snapshot snapshot, + boolean includeGlobalState, + boolean partial, + State state, + List indices, + List dataStreams, + long startTime, + long repositoryStateId, + final Map shards, + String failure, + Map userMetadata, + Version version, + boolean remoteStoreIndexShallowCopy, + boolean remoteStoreIndexShallowCopyV2 + ) { + this( + snapshot, + includeGlobalState, + partial, + state, + indices, + dataStreams, + startTime, + repositoryStateId, + shards, + failure, + userMetadata, + version, + null, + Map.of(), + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 + ); + } + // visible for testing, use #startedEntry and copy constructors in production code public Entry( Snapshot snapshot, @@ -243,7 +344,8 @@ public Entry( version, null, Map.of(), - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + false ); } @@ -262,7 +364,8 @@ private Entry( Version version, @Nullable SnapshotId source, @Nullable final Map clones, - boolean remoteStoreIndexShallowCopy + boolean remoteStoreIndexShallowCopy, + boolean remoteStoreIndexShallowCopyV2 ) { this.state = state; this.snapshot = snapshot; @@ -284,7 +387,9 @@ private Entry( this.clones = Collections.unmodifiableMap(clones); } this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; - assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); + this.remoteStoreIndexShallowCopyV2 = remoteStoreIndexShallowCopyV2; + assert this.remoteStoreIndexShallowCopyV2 + || assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } private Entry(StreamInput in) throws IOException { @@ -307,6 +412,11 @@ private Entry(StreamInput in) throws IOException { } else { remoteStoreIndexShallowCopy = false; } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStoreIndexShallowCopyV2 = in.readBoolean(); + } else { + remoteStoreIndexShallowCopyV2 = false; + } } private static boolean assertShardsConsistent( @@ -428,7 +538,8 @@ public Entry withRepoGen(long newRepoGen) { version, source, clones, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 ); } @@ -451,7 +562,8 @@ public Entry withClones(final Map update version, source, updatedClones, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 ); } @@ -471,7 +583,8 @@ public Entry withRemoteStoreIndexShallowCopy(final boolean remoteStoreIndexShall version, source, clones, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 ); } @@ -527,7 +640,8 @@ public Entry fail(final Map shards, State state, S version, source, clones, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + remoteStoreIndexShallowCopyV2 ); } @@ -614,6 +728,10 @@ public boolean remoteStoreIndexShallowCopy() { return remoteStoreIndexShallowCopy; } + public boolean remoteStoreIndexShallowCopyV2() { + return remoteStoreIndexShallowCopyV2; + } + public Map userMetadata() { return userMetadata; } @@ -678,6 +796,7 @@ public boolean equals(Object o) { if (Objects.equals(source, ((Entry) o).source) == false) return false; if (clones.equals(((Entry) o).clones) == false) return false; if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false; + if (remoteStoreIndexShallowCopyV2 != entry.remoteStoreIndexShallowCopyV2) return false; return true; } @@ -695,6 +814,7 @@ public int hashCode() { result = 31 * result + (source == null ? 0 : source.hashCode()); result = 31 * result + clones.hashCode(); result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0); + result = 31 * result + (remoteStoreIndexShallowCopyV2 ? 1 : 0); return result; } @@ -766,6 +886,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeBoolean(remoteStoreIndexShallowCopy); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeBoolean(remoteStoreIndexShallowCopyV2); + } } @Override diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 23f6deff3715d..22b2a72b36026 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -468,26 +468,33 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi long pinnedTimestamp = System.currentTimeMillis(); final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); - validate(repositoryName, snapshotName); - final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot Repository repository = repositoriesService.repository(repositoryName); + validate(repositoryName, snapshotName); + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) { + private SnapshotsInProgress.Entry newEntry; - if (repository.isReadOnly()) { - listener.onFailure( - new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") - ); - return; - } + private SnapshotId snapshotId; - final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); - ClusterState currentState = clusterService.state(); - final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - try { - final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + private Snapshot snapshot; + + boolean enteredLoop; + + @Override + public ClusterState execute(ClusterState currentState) { + // move to in progress + snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Repository repository = repositoriesService.repository(repositoryName); + + if (repository.isReadOnly()) { + listener.onFailure( + new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") + ); + } + + snapshot = new Snapshot(repositoryName, snapshotId); + final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - repositoryDataListener.whenComplete(repositoryData -> { createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); List indices = new ArrayList<>(currentState.metadata().indices().keySet()); @@ -498,7 +505,7 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi request.indices() ); - logger.trace("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); + logger.info("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); @@ -509,74 +516,139 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi IndexId.DEFAULT_SHARD_PATH_TYPE ); final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); - final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( - currentState.metadata(), - currentState.routingTable(), - indexIds, - repositoryData - ); if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { logger.debug("[{}] was aborted before starting", snapshot); throw new SnapshotException(snapshot, "Aborted on initialization"); } + + Map shards = new HashMap<>(); + + newEntry = SnapshotsInProgress.startedEntry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), + request.partial(), + indexIds, + dataStreams, + threadPool.absoluteTimeInMillis(), + repositoryData.getGenId(), + shards, + userMeta, + version, + true, + true + ); + final List newEntries = new ArrayList<>(runningSnapshots); + newEntries.add(newEntry); + + // Entering finalize loop here to prevent concurrent snapshots v2 snapshots + enteredLoop = tryEnterRepoLoop(repositoryName); + if (enteredLoop == false) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot start snapshot-v2 while a repository is in finalization state" + ); + } + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(new ArrayList<>(newEntries))) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot-v2", repositoryName, snapshotName), e); + listener.onFailure(e); + if (enteredLoop) { + leaveRepoLoop(repositoryName); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( + newState.metadata(), + newState.routingTable(), + newEntry.indices(), + repositoryData + ); + final List dataStreams = indexNameExpressionResolver.dataStreamNames( + newState, + request.indicesOptions(), + request.indices() + ); final SnapshotInfo snapshotInfo = new SnapshotInfo( - snapshot.getSnapshotId(), + snapshotId, shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - dataStreams, + newEntry.dataStreams(), pinnedTimestamp, null, System.currentTimeMillis(), shardGenerations.totalShards(), Collections.emptyList(), request.includeGlobalState(), - userMeta, + newEntry.userMetadata(), true, pinnedTimestamp ); - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager"); - } + final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null); final StepListener pinnedTimestampListener = new StepListener<>(); - pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure); - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot(currentState.metadata(), request.includeGlobalState(), false, dataStreams, indexIds), - snapshotInfo, - version, - state -> state, - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") - ); - return; + pinnedTimestampListener.whenComplete(repoData -> { + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()), + snapshotInfo, + version, + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + leaveRepoLoop(repositoryName); + if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException( + repositoryName, + snapshotName, + "Aborting snapshot-v2, no longer cluster manager" + ) + ); + return; + } + listener.onResponse(snapshotInfo); } - updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); - } - @Override - public void onFailure(Exception e) { - logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + // cleaning up in progress snapshot here + stateWithoutSnapshotV2(newState); + listener.onFailure(e); + } } - } - ); + ); + }, e -> { + logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e); + leaveRepoLoop(repositoryName); + // cleaning up in progress snapshot here + stateWithoutSnapshotV2(newState); + listener.onFailure(e); + }); + updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); + } - }, listener::onFailure); - } catch (Exception e) { - assert false : new AssertionError(e); - logger.error("Snapshot-v2 {} creation failed with exception {}", snapshot.getSnapshotId().getName(), e); - listener.onFailure(e); - } + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + + }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } private void createSnapshotPreValidations( @@ -770,12 +842,24 @@ public void cloneSnapshotV2( private SnapshotId sourceSnapshotId; private List indicesForSnapshot; + boolean enteredRepoLoop; + @Override public ClusterState execute(ClusterState currentState) { createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); + // Entering finalize loop here to prevent concurrent snapshots v2 snapshots + enteredRepoLoop = tryEnterRepoLoop(repositoryName); + if (enteredRepoLoop == false) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot start snapshot-v2 while a repository is in finalization state" + ); + } + sourceSnapshotId = repositoryData.getSnapshotIds() .stream() .filter(src -> src.getName().equals(request.source())) @@ -799,14 +883,14 @@ public ClusterState execute(ClusterState currentState) { indicesForSnapshot.add(indexId.getName()); } } - newEntry = SnapshotsInProgress.startClone( snapshot, sourceSnapshotId, repositoryData.resolveIndices(indicesForSnapshot), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null) + minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null), + true ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -817,6 +901,9 @@ public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot-v2", repositoryName, snapshotName), e); listener.onFailure(e); + if (enteredRepoLoop) { + leaveRepoLoop(repositoryName); + } } @Override @@ -843,67 +930,80 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl true, snapshotInfo.getPinnedTimestamp() ); - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager"); } final StepListener pinnedTimestampListener = new StepListener<>(); pinnedTimestampListener.whenComplete(repoData -> { - logger.info("snapshot-v2 clone [{}] completed successfully", snapshot); - listener.onResponse(null); - }, listener::onFailure); - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot( - currentState.metadata(), - newEntry.includeGlobalState(), - false, - newEntry.dataStreams(), - newEntry.indices() - ), - cloneSnapshotInfo, - repositoryData.getVersion(sourceSnapshotId), - state -> stateWithoutSnapshot(state, snapshot), - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException( - repositoryName, - snapshotName, - "Aborting Snapshot-v2 clone, no longer cluster manager" - ) - ); - return; + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot( + currentState.metadata(), + newEntry.includeGlobalState(), + false, + newEntry.dataStreams(), + newEntry.indices() + ), + cloneSnapshotInfo, + repositoryData.getVersion(sourceSnapshotId), + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + leaveRepoLoop(repositoryName); + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException( + repositoryName, + snapshotName, + "Aborting Snapshot-v2 clone, no longer cluster manager" + ) + ); + return; + } + logger.info("snapshot-v2 clone [{}] completed successfully", snapshot); + listener.onResponse(null); } - cloneSnapshotPinnedTimestamp( - repositoryData, - sourceSnapshotId, - snapshot, - snapshotInfo.getPinnedTimestamp(), - pinnedTimestampListener - ); - } - @Override - public void onFailure(Exception e) { - logger.error( - "Failed to upload files to snapshot repo {} for clone snapshot-v2 {} ", - repositoryName, - snapshotName - ); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + logger.error( + "Failed to upload files to snapshot repo {} for clone snapshot-v2 {} ", + repositoryName, + snapshotName + ); + stateWithoutSnapshotV2(newState); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + } } - } + ); + }, e -> { + logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName); + stateWithoutSnapshotV2(newState); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + }); + + cloneSnapshotPinnedTimestamp( + repositoryData, + sourceSnapshotId, + snapshot, + snapshotInfo.getPinnedTimestamp(), + pinnedTimestampListener ); - - }, listener::onFailure); + }, e -> { + logger.error("Failed to retrieve snapshot info for snapshot-v2 {} {} ", repositoryName, snapshotName); + stateWithoutSnapshotV2(newState); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + }); } @Override @@ -1486,6 +1586,13 @@ public void applyClusterState(ClusterChangedEvent event) { // cluster-manager SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final boolean newClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager() == false; + + if (newClusterManager && snapshotsInProgress.entries().isEmpty() == false) { + // clean up snapshot v2 in progress or clone v2 present. + // Snapshot v2 create and clone are sync operation . In case of cluster manager failures in midst , we won't + // send ack to caller and won't continue on new cluster manager . Caller will need to retry it. + stateWithoutSnapshotV2(event.state()); + } processExternalChanges( newClusterManager || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event) @@ -1597,7 +1704,14 @@ private void processExternalChanges(boolean changedNodes, boolean startShards) { @Override public ClusterState execute(ClusterState currentState) { RoutingTable routingTable = currentState.routingTable(); - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + // Removing shallow snapshots v2 as we we take care of these in stateWithoutSnapshotV2() + snapshots = SnapshotsInProgress.of( + snapshots.entries() + .stream() + .filter(snapshot -> snapshot.remoteStoreIndexShallowCopyV2() == false) + .collect(Collectors.toList()) + ); DiscoveryNodes nodes = currentState.nodes(); boolean changed = false; final EnumSet statesToUpdate; @@ -1654,7 +1768,7 @@ public ClusterState execute(ClusterState currentState) { changed = true; logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot); } else { - if (snapshot.state().completed() || completed(snapshot.shards().values())) { + if ((snapshot.state().completed() || completed(snapshot.shards().values()))) { finishedSnapshots.add(snapshot); } updatedSnapshotEntries.add(snapshot); @@ -2180,6 +2294,59 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn return readyDeletions(result).v1(); } + private void stateWithoutSnapshotV2(ClusterState state) { + SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.remoteStoreIndexShallowCopyV2()) { + changed = true; + } else { + entries.add(entry); + } + } + if (changed) { + logger.info("Cleaning up in progress v2 snapshots now"); + clusterService.submitStateUpdateTask( + "remove in progress snapshot v2 after cluster manager switch", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.remoteStoreIndexShallowCopyV2()) { + changed = true; + } else { + entries.add(entry); + } + } + if (changed) { + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))) + .build(); + } else { + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + // execute never fails , so we should never hit this. + logger.warn( + () -> new ParameterizedMessage( + "failed to remove in progress snapshot v2 state after cluster manager switch {}", + e + ), + e + ); + } + } + ); + } + } + /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -3330,6 +3497,9 @@ public boolean assertAllListenersResolved() { + " on [" + localNode + "]"; + if (repositoryOperations.isEmpty() == false) { + logger.info("Not empty"); + } assert repositoryOperations.isEmpty() : "Found leaked snapshots to finalize " + repositoryOperations + " on [" + localNode + "]"; return true; } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java index 8fd1f44286094..d79cb62b6b7ac 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -214,7 +214,11 @@ public void testSerDeRemoteStoreIndexShallowCopy() throws IOException { assert (curr_entry.remoteStoreIndexShallowCopy() == false); } } - try (StreamInput in = out.bytes().streamInput()) { + + BytesStreamOutput out2 = new BytesStreamOutput(); + out2.setVersion(Version.V_2_9_0); + snapshotsInProgress.writeTo(out2); + try (StreamInput in = out2.bytes().streamInput()) { in.setVersion(Version.V_2_9_0); actualSnapshotsInProgress = new SnapshotsInProgress(in); assert in.available() == 0;