From 8041083dbfddb95d15d72c1ef1a4933e809b4972 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Tue, 9 May 2023 11:59:36 -0700 Subject: [PATCH 1/9] Add Changes in Snapshot Clone Flow for remote store interoperability. Signed-off-by: Bansi Kasundra --- .../opensearch/snapshots/CloneSnapshotIT.java | 18 ++- .../cluster/SnapshotsInProgress.java | 5 +- .../RemoteStoreShardShallowCopySnapshot.java | 25 +++ .../index/store/lockmanager/FileLockInfo.java | 5 + .../lockmanager/RemoteStoreLockManager.java | 8 + .../RemoteStoreMetadataLockManager.java | 19 +++ .../repositories/FilterRepository.java | 4 +- .../opensearch/repositories/Repository.java | 2 + .../blobstore/BlobStoreRepository.java | 149 +++++++++++------- .../snapshots/SnapshotsService.java | 136 ++++++++++------ .../RepositoriesServiceTests.java | 2 + .../snapshots/SnapshotsServiceTests.java | 3 +- .../index/shard/RestoreOnlyRepository.java | 2 + 13 files changed, 261 insertions(+), 117 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 5441dae9703ce..65653bb495491 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -95,7 +95,14 @@ public void testShardClone() throws Exception { final String currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); final String newShardGeneration = PlainActionFuture.get( - f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f) + f -> repository.cloneShardSnapshot( + sourceSnapshotInfo.snapshotId(), + targetSnapshotId, + repositoryShardId, + currentShardGen, + null, + f + ) ); final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId); @@ -119,7 +126,14 @@ public void testShardClone() throws Exception { // verify that repeated cloning is idempotent final String newShardGeneration2 = PlainActionFuture.get( - f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, newShardGeneration, f) + f -> repository.cloneShardSnapshot( + sourceSnapshotInfo.snapshotId(), + targetSnapshotId, + repositoryShardId, + newShardGeneration, + null, + f + ) ); assertEquals(newShardGeneration, newShardGeneration2); } diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index d827dc6409778..94f706d467897 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -150,7 +150,8 @@ public static Entry startClone( List indices, long startTime, long repositoryStateId, - Version version + Version version, + boolean isRemoteStoreInteropEnabled ) { return new SnapshotsInProgress.Entry( snapshot, @@ -167,7 +168,7 @@ public static Entry startClone( version, source, Map.of(), - false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create. + isRemoteStoreInteropEnabled ); } diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java index 8e6ed870c904f..d9be8af07fa1e 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -408,4 +408,29 @@ private void verifyParameters( throw new IllegalArgumentException(exceptionStr); } } + + /** + * Creates a new instance which has a different name and zero incremental file counts but is identical to this instance in terms of the files + * it references. + * + * @param targetSnapshotName target snapshot name + * @param startTime time the clone operation on the repository was started + * @param time time it took to create the clone + */ + public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, long startTime, long time) { + return new RemoteStoreShardShallowCopySnapshot( + targetSnapshotName, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + 0, + 0, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames + ); + } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java index a8fb7bf20c393..e0bf67167ebc7 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -54,6 +54,11 @@ List getLocksForAcquirer(String[] lockFiles) { if (acquirerId == null || acquirerId.isBlank()) { throw new IllegalArgumentException("Acquirer ID should be provided"); } + List locksForAcquirer = Arrays.stream(lockFiles) + .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) + .collect(Collectors.toList()); + assert locksForAcquirer.size() == 1 : "Multiple lock files found for acquirer"; + return locksForAcquirer; return Arrays.stream(lockFiles) .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index c30be082b4795..0859a7bc60c21 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -39,6 +39,14 @@ public interface RemoteStoreLockManager { */ Boolean isAcquired(LockInfo lockInfo) throws IOException; + /** + * + * @param originalLockInfo lock info instance for original lock. + * @param clonedLockInfo lock info instance for which lock needs to be cloned. + * @throws IOException throws IOException if originalResource itself do not have any lock. + */ + void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException; + /* Deletes all lock related files and directories */ diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 7df20cae10664..f348c8b51d183 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -84,6 +84,25 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { return !lockFiles.isEmpty(); } + /** + * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo + * @param originalLockInfo lock info instance for original lock. + * @param clonedLockInfo lock info instance for which lock needs to be cloned. + * @throws IOException throws IOException if originalResource itself do not have any lock. + */ + @Override + public void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException { + assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of ShardLockInfo"; + assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of ShardLockInfo"; + String originalResourceId = ((FileLockInfo) originalLockInfo).getAcquirerId(); + String clonedResourceId = ((FileLockInfo) clonedLockInfo).getAcquirerId(); + assert originalResourceId != null && clonedResourceId != null : "provided resourceIds should not be null"; + String[] lockFiles = lockDirectory.listAll(); + String lockNameForAcquirer = ((FileLockInfo) originalLockInfo).getLocksForAcquirer(lockFiles).get(0); + String fileToLockName = FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lockNameForAcquirer); + acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(fileToLockName).withAcquirerId(clonedResourceId).build()); + } + public void delete() throws IOException { lockDirectory.delete(); } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index b108e2da1ab04..7c70b5ae7df93 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -47,6 +47,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -254,9 +255,10 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId shardId, String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener); + in.cloneShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index c08369b79452d..4988f0bf9a91a 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -48,6 +48,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; @@ -372,6 +373,7 @@ void cloneShardSnapshot( SnapshotId target, RepositoryShardId shardId, @Nullable String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f04bf83c2f1d1..c8b9cf9b7b64e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -114,6 +114,9 @@ import org.opensearch.index.snapshots.blobstore.SnapshotFiles; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -494,6 +497,7 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId shardId, @Nullable String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { if (isReadOnly()) { @@ -506,74 +510,97 @@ public void cloneShardSnapshot( executor.execute(ActionRunnable.supply(listener, () -> { final long startTime = threadPool.absoluteTimeInMillis(); final BlobContainer shardContainer = shardContainer(index, shardNum); - final BlobStoreIndexShardSnapshots existingSnapshots; final String newGen; - final String existingShardGen; - if (shardGeneration == null) { - Tuple tuple = buildBlobStoreIndexShardSnapshots( - shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(), - shardContainer + if (remoteStoreLockManagerFactory == null) { + final BlobStoreIndexShardSnapshots existingSnapshots; + final String existingShardGen; + if (shardGeneration == null) { + Tuple tuple = buildBlobStoreIndexShardSnapshots( + shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(), + shardContainer + ); + existingShardGen = String.valueOf(tuple.v2()); + newGen = String.valueOf(tuple.v2() + 1); + existingSnapshots = tuple.v1(); + } else { + newGen = UUIDs.randomBase64UUID(); + existingSnapshots = buildBlobStoreIndexShardSnapshots(Collections.emptySet(), shardContainer, shardGeneration).v1(); + existingShardGen = shardGeneration; + } + SnapshotFiles existingTargetFiles = null; + SnapshotFiles sourceFiles = null; + for (SnapshotFiles existingSnapshot : existingSnapshots) { + final String snapshotName = existingSnapshot.snapshot(); + if (snapshotName.equals(target.getName())) { + existingTargetFiles = existingSnapshot; + } else if (snapshotName.equals(source.getName())) { + sourceFiles = existingSnapshot; + } + if (sourceFiles != null && existingTargetFiles != null) { + break; + } + } + if (sourceFiles == null) { + throw new RepositoryException( + metadata.name(), + "Can't create clone of [" + + shardId + + "] for snapshot [" + + target + + "]. The source snapshot [" + + source + + "] was not found in the shard metadata." + ); + } + if (existingTargetFiles != null) { + if (existingTargetFiles.isSame(sourceFiles)) { + return existingShardGen; + } + throw new RepositoryException( + metadata.name(), + "Can't create clone of [" + + shardId + + "] for snapshot [" + + target + + "]. A snapshot by that name already exists for this shard." + ); + } + final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); + logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); + INDEX_SHARD_SNAPSHOT_FORMAT.write( + sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compress + ); + INDEX_SHARD_SNAPSHOTS_FORMAT.write( + existingSnapshots.withClone(source.getName(), target.getName()), + shardContainer, + newGen, + compress ); - existingShardGen = String.valueOf(tuple.v2()); - newGen = String.valueOf(tuple.v2() + 1); - existingSnapshots = tuple.v1(); } else { newGen = UUIDs.randomBase64UUID(); - existingSnapshots = buildBlobStoreIndexShardSnapshots(Collections.emptySet(), shardContainer, shardGeneration).v1(); - existingShardGen = shardGeneration; - } - SnapshotFiles existingTargetFiles = null; - SnapshotFiles sourceFiles = null; - for (SnapshotFiles existingSnapshot : existingSnapshots) { - final String snapshotName = existingSnapshot.snapshot(); - if (snapshotName.equals(target.getName())) { - existingTargetFiles = existingSnapshot; - } else if (snapshotName.equals(source.getName())) { - sourceFiles = existingSnapshot; - } - if (sourceFiles != null && existingTargetFiles != null) { - break; - } - } - if (sourceFiles == null) { - throw new RepositoryException( - metadata.name(), - "Can't create clone of [" - + shardId - + "] for snapshot [" - + target - + "]. The source snapshot [" - + source - + "] was not found in the shard metadata." + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadRemStoreEnabledShardSnapshot(shardContainer, source); + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compress ); - } - if (existingTargetFiles != null) { - if (existingTargetFiles.isSame(sourceFiles)) { - return existingShardGen; - } - throw new RepositoryException( - metadata.name(), - "Can't create clone of [" - + shardId - + "] for snapshot [" - + target - + "]. A snapshot by that name already exists for this shard." + String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); + String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.shardId()) + ); + + remoteStoreMetadataLockManger.cloneLock( + FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() ); } - final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); - logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); - INDEX_SHARD_SNAPSHOT_FORMAT.write( - sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), - shardContainer, - target.getUUID(), - compressor - ); - INDEX_SHARD_SNAPSHOTS_FORMAT.write( - existingSnapshots.withClone(source.getName(), target.getName()), - shardContainer, - newGen, - compressor - ); return newGen; })); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index b523c1ba12b05..4b0c8a33196ab 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -90,6 +90,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -476,13 +477,16 @@ public ClusterState execute(ClusterState currentState) { + "]" ); } + final boolean isRemoteStoreInteropEnabled = repository.getSnapshotInfo(sourceSnapshotId) + .isRemoteStoreIndexShallowCopyEnabled(); newEntry = SnapshotsInProgress.startClone( snapshot, sourceSnapshotId, repositoryData.resolveIndices(matchingIndices), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null) + minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null), + isRemoteStoreInteropEnabled ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -669,58 +673,90 @@ private void runReadyClone( RepositoryShardId repoShardId, Repository repository ) { - final SnapshotId targetSnapshot = target.getSnapshotId(); - final String localNodeId = clusterService.localNode().getId(); - if (currentlyCloning.add(repoShardId)) { - repository.cloneShardSnapshot( - sourceSnapshot, - targetSnapshot, - repoShardId, - shardStatusBefore.generation(), - ActionListener.wrap( - generation -> innerUpdateSnapshotState( - new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as successfully cloned from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot - ), - e -> { - logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(e); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) - ), - e -> innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, + repository.getRepositoryData(new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + try { + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory = null; + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( + repositoryData, + sourceSnapshot, + repoShardId.index() + ); + final boolean isRemoteStoreInteropEnabled = repository.getSnapshotInfo(sourceSnapshot) + .isRemoteStoreIndexShallowCopyEnabled(); + if (isRemoteStoreInteropEnabled + && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); + } + final SnapshotId targetSnapshot = target.getSnapshotId(); + final String localNodeId = clusterService.localNode().getId(); + if (currentlyCloning.add(repoShardId)) { + repository.cloneShardSnapshot( + sourceSnapshot, + targetSnapshot, repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) - ), - ActionListener.runBefore( + shardStatusBefore.generation(), + remoteStoreLockManagerFactory, ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as failed clone from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot + generation -> innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as successfully cloned from [{}] to [{}]", + repoShardId, + sourceSnapshot, + targetSnapshot + ), + e -> { + logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(e); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) ), - ex -> { - logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(ex); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) - ) - ) - ); - } + e -> innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as failed clone from [{}] to [{}]", + repoShardId, + sourceSnapshot, + targetSnapshot + ), + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) + ) + ) + ); + } + } catch (IOException e) { + logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); + } + } + + @Override + public void onFailure(Exception e) { + assert false : new AssertionError(e); + logger.warn("Failed to get repository data ", e); + } + }); } private void ensureBelowConcurrencyLimit( diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index f5295bead19a4..21fdf0f6f0f8d 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -60,6 +60,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -373,6 +374,7 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId shardId, String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java index 9b12212c791a2..eee51ee70e6b3 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java @@ -498,7 +498,8 @@ private static SnapshotsInProgress.Entry cloneEntry( .map(k -> k.index()) .distinct() .collect(Collectors.toList()); - return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones); + return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT, false) + .withClones(clones); } private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 2a85fffa8699a..58b3f0308f43c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -44,6 +44,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.Repository; @@ -201,6 +202,7 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId repositoryShardId, String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { throw new UnsupportedOperationException("Unsupported for restore-only repository"); From 656c40cbc9d9e412d55399df5500712106f5f635 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Thu, 18 May 2023 13:20:29 -0700 Subject: [PATCH 2/9] Addressed PR comments Signed-off-by: Bansi Kasundra --- .../opensearch/snapshots/CloneSnapshotIT.java | 18 +- .../cluster/SnapshotsInProgress.java | 5 +- .../RemoteStoreMetadataLockManager.java | 4 +- .../repositories/FilterRepository.java | 15 +- .../opensearch/repositories/Repository.java | 18 ++ .../blobstore/BlobStoreRepository.java | 189 ++++++++++-------- .../snapshots/SnapshotsService.java | 142 +++++++------ .../RepositoriesServiceTests.java | 11 + .../index/shard/RestoreOnlyRepository.java | 12 ++ 9 files changed, 250 insertions(+), 164 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 65653bb495491..5441dae9703ce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -95,14 +95,7 @@ public void testShardClone() throws Exception { final String currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); final String newShardGeneration = PlainActionFuture.get( - f -> repository.cloneShardSnapshot( - sourceSnapshotInfo.snapshotId(), - targetSnapshotId, - repositoryShardId, - currentShardGen, - null, - f - ) + f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f) ); final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId); @@ -126,14 +119,7 @@ public void testShardClone() throws Exception { // verify that repeated cloning is idempotent final String newShardGeneration2 = PlainActionFuture.get( - f -> repository.cloneShardSnapshot( - sourceSnapshotInfo.snapshotId(), - targetSnapshotId, - repositoryShardId, - newShardGeneration, - null, - f - ) + f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, newShardGeneration, f) ); assertEquals(newShardGeneration, newShardGeneration2); } diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 94f706d467897..8f776e3e3ae73 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -142,6 +142,7 @@ public static Entry startedEntry( * @param startTime start time * @param repositoryStateId repository state id that this clone is based on * @param version repository metadata version to write + * @param remoteStoreIndexShallowCopy if it is a shallow snapshot * @return snapshot clone entry */ public static Entry startClone( @@ -151,7 +152,7 @@ public static Entry startClone( long startTime, long repositoryStateId, Version version, - boolean isRemoteStoreInteropEnabled + boolean remoteStoreIndexShallowCopy ) { return new SnapshotsInProgress.Entry( snapshot, @@ -168,7 +169,7 @@ public static Entry startClone( version, source, Map.of(), - isRemoteStoreInteropEnabled + remoteStoreIndexShallowCopy ); } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index f348c8b51d183..67060d2519840 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -92,8 +92,8 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { */ @Override public void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException { - assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of ShardLockInfo"; - assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of ShardLockInfo"; + assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of FileLockInfo"; + assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of FileLockInfo"; String originalResourceId = ((FileLockInfo) originalLockInfo).getAcquirerId(); String clonedResourceId = ((FileLockInfo) clonedLockInfo).getAcquirerId(); assert originalResourceId != null && clonedResourceId != null : "provided resourceIds should not be null"; diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 7c70b5ae7df93..764f36df6d337 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -250,7 +250,7 @@ public void executeConsistentStateUpdate( } @Override - public void cloneShardSnapshot( + public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, RepositoryShardId shardId, @@ -258,7 +258,18 @@ public void cloneShardSnapshot( RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.cloneShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); + in.cloneRemoteStoreIndexShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); + } + + @Override + public void cloneShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + String shardGeneration, + ActionListener listener + ) { + in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 4988f0bf9a91a..a0c6d263a4b55 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -369,6 +369,24 @@ void executeConsistentStateUpdate( * @param listener listener to complete with new shard generation once clone has completed */ void cloneShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + @Nullable String shardGeneration, + ActionListener listener + ); + + /** + * Clones a remote store index shard snapshot. + * + * @param source source snapshot + * @param target target snapshot + * @param shardId shard id + * @param shardGeneration shard generation in repo + * @param remoteStoreLockManagerFactory remoteStoreLockManagerFactory for cloning metadata lock file + * @param listener listener to complete with new shard generation once clone has completed + */ + void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, RepositoryShardId shardId, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c8b9cf9b7b64e..fa14c57218432 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -497,7 +497,6 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId shardId, @Nullable String shardGeneration, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { if (isReadOnly()) { @@ -511,100 +510,120 @@ public void cloneShardSnapshot( final long startTime = threadPool.absoluteTimeInMillis(); final BlobContainer shardContainer = shardContainer(index, shardNum); final String newGen; - if (remoteStoreLockManagerFactory == null) { - final BlobStoreIndexShardSnapshots existingSnapshots; - final String existingShardGen; - if (shardGeneration == null) { - Tuple tuple = buildBlobStoreIndexShardSnapshots( - shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(), - shardContainer - ); - existingShardGen = String.valueOf(tuple.v2()); - newGen = String.valueOf(tuple.v2() + 1); - existingSnapshots = tuple.v1(); - } else { - newGen = UUIDs.randomBase64UUID(); - existingSnapshots = buildBlobStoreIndexShardSnapshots(Collections.emptySet(), shardContainer, shardGeneration).v1(); - existingShardGen = shardGeneration; - } - SnapshotFiles existingTargetFiles = null; - SnapshotFiles sourceFiles = null; - for (SnapshotFiles existingSnapshot : existingSnapshots) { - final String snapshotName = existingSnapshot.snapshot(); - if (snapshotName.equals(target.getName())) { - existingTargetFiles = existingSnapshot; - } else if (snapshotName.equals(source.getName())) { - sourceFiles = existingSnapshot; - } - if (sourceFiles != null && existingTargetFiles != null) { - break; - } - } - if (sourceFiles == null) { - throw new RepositoryException( - metadata.name(), - "Can't create clone of [" - + shardId - + "] for snapshot [" - + target - + "]. The source snapshot [" - + source - + "] was not found in the shard metadata." - ); - } - if (existingTargetFiles != null) { - if (existingTargetFiles.isSame(sourceFiles)) { - return existingShardGen; - } - throw new RepositoryException( - metadata.name(), - "Can't create clone of [" - + shardId - + "] for snapshot [" - + target - + "]. A snapshot by that name already exists for this shard." - ); - } - final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); - logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); - INDEX_SHARD_SNAPSHOT_FORMAT.write( - sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), - shardContainer, - target.getUUID(), - compress - ); - INDEX_SHARD_SNAPSHOTS_FORMAT.write( - existingSnapshots.withClone(source.getName(), target.getName()), - shardContainer, - newGen, - compress + final BlobStoreIndexShardSnapshots existingSnapshots; + final String existingShardGen; + if (shardGeneration == null) { + Tuple tuple = buildBlobStoreIndexShardSnapshots( + shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(), + shardContainer ); + existingShardGen = String.valueOf(tuple.v2()); + newGen = String.valueOf(tuple.v2() + 1); + existingSnapshots = tuple.v1(); } else { newGen = UUIDs.randomBase64UUID(); - RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadRemStoreEnabledShardSnapshot(shardContainer, source); - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( - remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), - shardContainer, - target.getUUID(), - compress - ); - String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); - String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepository, - indexUUID, - String.valueOf(shardId.shardId()) + existingSnapshots = buildBlobStoreIndexShardSnapshots(Collections.emptySet(), shardContainer, shardGeneration).v1(); + existingShardGen = shardGeneration; + } + SnapshotFiles existingTargetFiles = null; + SnapshotFiles sourceFiles = null; + for (SnapshotFiles existingSnapshot : existingSnapshots) { + final String snapshotName = existingSnapshot.snapshot(); + if (snapshotName.equals(target.getName())) { + existingTargetFiles = existingSnapshot; + } else if (snapshotName.equals(source.getName())) { + sourceFiles = existingSnapshot; + } + if (sourceFiles != null && existingTargetFiles != null) { + break; + } + } + if (sourceFiles == null) { + throw new RepositoryException( + metadata.name(), + "Can't create clone of [" + + shardId + + "] for snapshot [" + + target + + "]. The source snapshot [" + + source + + "] was not found in the shard metadata." ); - - remoteStoreMetadataLockManger.cloneLock( - FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), - FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + } + if (existingTargetFiles != null) { + if (existingTargetFiles.isSame(sourceFiles)) { + return existingShardGen; + } + throw new RepositoryException( + metadata.name(), + "Can't create clone of [" + + shardId + + "] for snapshot [" + + target + + "]. A snapshot by that name already exists for this shard." ); } + final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); + logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); + INDEX_SHARD_SNAPSHOT_FORMAT.write( + sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compress + ); + INDEX_SHARD_SNAPSHOTS_FORMAT.write( + existingSnapshots.withClone(source.getName(), target.getName()), + shardContainer, + newGen, + compress + ); return newGen; })); } + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + @Nullable String shardGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + if (isReadOnly()) { + listener.onFailure(new RepositoryException(metadata.name(), "cannot clone shard snapshot on a readonly repository")); + return; + } + final IndexId index = shardId.index(); + final int shardNum = shardId.shardId(); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + executor.execute(ActionRunnable.supply(listener, () -> { + final long startTime = threadPool.absoluteTimeInMillis(); + final BlobContainer shardContainer = shardContainer(index, shardNum); + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadRemStoreEnabledShardSnapshot(shardContainer, source); + + String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); + String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.shardId()) + ); + remoteStoreMetadataLockManger.cloneLock( + FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + ); + + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compress + ); + return shardGeneration; + })); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 4b0c8a33196ab..4c67aa3a52c71 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -90,6 +90,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -152,6 +153,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final RepositoriesService repositoriesService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; + private final ThreadPool threadPool; private final Map>>> snapshotCompletionListeners = @@ -207,6 +210,7 @@ public SnapshotsService( this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; + this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); this.threadPool = transportService.getThreadPool(); this.transportService = transportService; @@ -477,7 +481,7 @@ public ClusterState execute(ClusterState currentState) { + "]" ); } - final boolean isRemoteStoreInteropEnabled = repository.getSnapshotInfo(sourceSnapshotId) + final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshotId) .isRemoteStoreIndexShallowCopyEnabled(); newEntry = SnapshotsInProgress.startClone( snapshot, @@ -486,7 +490,7 @@ public ClusterState execute(ClusterState currentState) { threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null), - isRemoteStoreInteropEnabled + remoteStoreIndexShallowCopy ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -677,74 +681,98 @@ private void runReadyClone( @Override public void onResponse(RepositoryData repositoryData) { try { - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory = null; final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( repositoryData, sourceSnapshot, repoShardId.index() ); - final boolean isRemoteStoreInteropEnabled = repository.getSnapshotInfo(sourceSnapshot) + final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshot) .isRemoteStoreIndexShallowCopyEnabled(); - if (isRemoteStoreInteropEnabled - && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { - remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); - } + final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy + && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); final SnapshotId targetSnapshot = target.getSnapshotId(); final String localNodeId = clusterService.localNode().getId(); - if (currentlyCloning.add(repoShardId)) { - repository.cloneShardSnapshot( - sourceSnapshot, - targetSnapshot, - repoShardId, - shardStatusBefore.generation(), - remoteStoreLockManagerFactory, - ActionListener.wrap( - generation -> innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, + final ActionListener listener = ActionListener.wrap( + generation -> innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) + sourceSnapshot, + targetSnapshot ), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as successfully cloned from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot - ), - e -> { - logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(e); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) + e -> { + logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(e); + } ), - e -> innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, - repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) - ), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as failed clone from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot - ), - ex -> { - logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(ex); - } + () -> currentlyCloning.remove(repoShardId) + ) + ), + e -> { + if (cloneRemoteStoreIndexShardSnapshot) { + final String indexUUID = indexMetadata.getIndexUUID(); + final String remoteStoreRepoForIndex = indexMetadata.getSettings() + .get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); + try { + remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + String.valueOf(repoShardId.shardId()) + ).release(FileLockInfo.getLockInfoBuilder().withAcquirerId(targetSnapshot.getUUID()).build()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as failed clone from [{}] to [{}]", + repoShardId, + sourceSnapshot, + targetSnapshot ), - () -> currentlyCloning.remove(repoShardId) - ) + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), + () -> currentlyCloning.remove(repoShardId) ) - ) - ); + ); + } + ); + if (currentlyCloning.add(repoShardId)) { + if (cloneRemoteStoreIndexShardSnapshot) { + repository.cloneRemoteStoreIndexShardSnapshot( + sourceSnapshot, + targetSnapshot, + repoShardId, + shardStatusBefore.generation(), + remoteStoreLockManagerFactory, + listener + ); + } else { + repository.cloneShardSnapshot( + sourceSnapshot, + targetSnapshot, + repoShardId, + shardStatusBefore.generation(), + listener + ); + } } } catch (IOException e) { logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 21fdf0f6f0f8d..edf5b6c84bc54 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -370,6 +370,17 @@ public void executeConsistentStateUpdate( @Override public void cloneShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + String shardGeneration, + ActionListener listener + ) { + + } + + @Override + public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, RepositoryShardId shardId, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 58b3f0308f43c..dfcda018941d6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; @@ -202,6 +203,17 @@ public void cloneShardSnapshot( SnapshotId target, RepositoryShardId repositoryShardId, String shardGeneration, + ActionListener listener + ) { + throw new UnsupportedOperationException("Unsupported for restore-only repository"); + } + + @Override + public void cloneRemoteStoreIndexShardSnapshot( + SnapshotId source, + SnapshotId target, + RepositoryShardId shardId, + @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { From 90ffb173f15e01aca8196ee0b96289dd43670017 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Mon, 5 Jun 2023 13:05:57 -0700 Subject: [PATCH 3/9] Minor Clone Snapshot changes related to parameter Signed-off-by: Bansi Kasundra --- .../cluster/SnapshotsInProgress.java | 30 ++- .../blobstore/BlobStoreRepository.java | 8 +- .../snapshots/SnapshotsService.java | 187 +++++++++--------- .../snapshots/SnapshotsServiceTests.java | 3 +- .../index/shard/RestoreOnlyRepository.java | 3 +- 5 files changed, 129 insertions(+), 102 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 8f776e3e3ae73..a121f2182f158 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -142,7 +142,6 @@ public static Entry startedEntry( * @param startTime start time * @param repositoryStateId repository state id that this clone is based on * @param version repository metadata version to write - * @param remoteStoreIndexShallowCopy if it is a shallow snapshot * @return snapshot clone entry */ public static Entry startClone( @@ -151,8 +150,7 @@ public static Entry startClone( List indices, long startTime, long repositoryStateId, - Version version, - boolean remoteStoreIndexShallowCopy + Version version ) { return new SnapshotsInProgress.Entry( snapshot, @@ -169,7 +167,8 @@ public static Entry startClone( version, source, Map.of(), - remoteStoreIndexShallowCopy + false // initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with + // clone jobs ); } @@ -455,6 +454,29 @@ public Entry withClones(final Map update ); } + public Entry withRemoteStoreIndexShallowCopy(final boolean remoteStoreIndexShallowCopy) { + if (remoteStoreIndexShallowCopy == this.remoteStoreIndexShallowCopy) { + return this; + } + return new Entry( + snapshot, + includeGlobalState, + partial, + state, + indices, + dataStreams, + startTime, + repositoryStateId, + shards, + failure, + userMetadata, + version, + source, + clones, + remoteStoreIndexShallowCopy + ); + } + /** * Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a * data node or to {@link ShardState#FAILED} if not assigned to any data node. diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index fa14c57218432..c4959a0c47783 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -563,6 +563,9 @@ public void cloneShardSnapshot( + "]. A snapshot by that name already exists for this shard." ); } + // We don't need to check if there exists a shallow snapshot with the same name as we have the check before starting the clone + // operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone snapshot would + // have a different UUID and hence a new unique snap-N file will be created. final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source); logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target); INDEX_SHARD_SNAPSHOT_FORMAT.write( @@ -600,7 +603,10 @@ public void cloneRemoteStoreIndexShardSnapshot( executor.execute(ActionRunnable.supply(listener, () -> { final long startTime = threadPool.absoluteTimeInMillis(); final BlobContainer shardContainer = shardContainer(index, shardNum); - RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadRemStoreEnabledShardSnapshot(shardContainer, source); + // We don't need to check if there exists a shallow/full copy snapshot with the same name as we have the check before starting the clone + // operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone shallow snapshot would + // have a different UUID and hence a new unique shallow-snap-N file will be created. + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 4c67aa3a52c71..1410fcda53e76 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -88,6 +88,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.lockmanager.FileLockInfo; @@ -481,16 +482,13 @@ public ClusterState execute(ClusterState currentState) { + "]" ); } - final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshotId) - .isRemoteStoreIndexShallowCopyEnabled(); newEntry = SnapshotsInProgress.startClone( snapshot, sourceSnapshotId, repositoryData.resolveIndices(matchingIndices), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null), - remoteStoreIndexShallowCopy + minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null) ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -629,7 +627,8 @@ public ClusterState execute(ClusterState currentState) { } } } - updatedEntry = cloneEntry.withClones(clonesBuilder); + updatedEntry = cloneEntry.withClones(clonesBuilder) + .withRemoteStoreIndexShallowCopy(snapshotInfoListener.result().isRemoteStoreIndexShallowCopyEnabled());; updatedEntries.set(i, updatedEntry); changed = true; break; @@ -677,112 +676,114 @@ private void runReadyClone( RepositoryShardId repoShardId, Repository repository ) { - repository.getRepositoryData(new ActionListener() { + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + executor.execute(new AbstractRunnable() { @Override - public void onResponse(RepositoryData repositoryData) { - try { - final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( - repositoryData, - sourceSnapshot, - repoShardId.index() - ); - final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshot) - .isRemoteStoreIndexShallowCopyEnabled(); - final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy - && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); - final SnapshotId targetSnapshot = target.getSnapshotId(); - final String localNodeId = clusterService.localNode().getId(); - final ActionListener listener = ActionListener.wrap( - generation -> innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, - repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) - ), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as successfully cloned from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot - ), - e -> { - logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(e); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) - ), - e -> { - if (cloneRemoteStoreIndexShardSnapshot) { - final String indexUUID = indexMetadata.getIndexUUID(); - final String remoteStoreRepoForIndex = indexMetadata.getSettings() - .get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); - try { - remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - String.valueOf(repoShardId.shardId()) - ).release(FileLockInfo.getLockInfoBuilder().withAcquirerId(targetSnapshot.getUUID()).build()); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - innerUpdateSnapshotState( + public void onFailure(Exception e) { + logger.warn("Failed to get Repository data"); + } + + @Override + protected void doRun() throws Exception { + repository.getRepositoryData(ActionListener.wrap(repositoryData -> { + try { + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( + repositoryData, + sourceSnapshot, + repoShardId.index() + ); + final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshot) + .isRemoteStoreIndexShallowCopyEnabled(); + final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy + && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); + final SnapshotId targetSnapshot = target.getSnapshotId(); + final String localNodeId = clusterService.localNode().getId(); + final ActionListener listener = ActionListener.wrap( + generation -> innerUpdateSnapshotState( new ShardSnapshotUpdate( target, repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation) ), ActionListener.runBefore( ActionListener.wrap( v -> logger.trace( - "Marked [{}] as failed clone from [{}] to [{}]", + "Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId, sourceSnapshot, targetSnapshot ), - ex -> { - logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(ex); + e -> { + logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(e); } ), () -> currentlyCloning.remove(repoShardId) ) - ); - } - ); - if (currentlyCloning.add(repoShardId)) { - if (cloneRemoteStoreIndexShardSnapshot) { - repository.cloneRemoteStoreIndexShardSnapshot( - sourceSnapshot, - targetSnapshot, - repoShardId, - shardStatusBefore.generation(), - remoteStoreLockManagerFactory, - listener - ); - } else { - repository.cloneShardSnapshot( - sourceSnapshot, - targetSnapshot, - repoShardId, - shardStatusBefore.generation(), - listener - ); + ), + e -> { + if (cloneRemoteStoreIndexShardSnapshot) { + final String indexUUID = indexMetadata.getIndexUUID(); + final String remoteStoreRepoForIndex = indexMetadata.getSettings() + .get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); + try { + remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + String.valueOf(repoShardId.shardId()) + ).release(FileLockInfo.getLockInfoBuilder().withAcquirerId(targetSnapshot.getUUID()).build()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace( + "Marked [{}] as failed clone from [{}] to [{}]", + repoShardId, + sourceSnapshot, + targetSnapshot + ), + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) + ); + } + ); + if (currentlyCloning.add(repoShardId)) { + if (cloneRemoteStoreIndexShardSnapshot) { + repository.cloneRemoteStoreIndexShardSnapshot( + sourceSnapshot, + targetSnapshot, + repoShardId, + shardStatusBefore.generation(), + remoteStoreLockManagerFactory, + listener + ); + } else { + repository.cloneShardSnapshot( + sourceSnapshot, + targetSnapshot, + repoShardId, + shardStatusBefore.generation(), + listener + ); + } } + } catch (IOException e) { + logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); } - } catch (IOException e) { - logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); - } - } - - @Override - public void onFailure(Exception e) { - assert false : new AssertionError(e); - logger.warn("Failed to get repository data ", e); + }, this::onFailure)); } }); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java index eee51ee70e6b3..9b12212c791a2 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java @@ -498,8 +498,7 @@ private static SnapshotsInProgress.Entry cloneEntry( .map(k -> k.index()) .distinct() .collect(Collectors.toList()); - return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT, false) - .withClones(clones); + return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones); } private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index dfcda018941d6..67f3f7fc1f50c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -40,7 +40,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; @@ -213,7 +212,7 @@ public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, RepositoryShardId shardId, - @Nullable String shardGeneration, + String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { From 2b5945ae3261291093a21221fcb1f5646cc7fae1 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Tue, 6 Jun 2023 13:08:19 -0700 Subject: [PATCH 4/9] Added Tests for Clone Snapshot API Signed-off-by: Bansi Kasundra --- .../opensearch/snapshots/CloneSnapshotIT.java | 88 +++++++++++++++++++ .../AbstractSnapshotIntegTestCase.java | 42 +++++++++ 2 files changed, 130 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 5441dae9703ce..e1be1525d1043 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -38,11 +38,18 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.SnapshotsInProgress; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.indices.InvalidAliasNameException; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.ArrayList; @@ -64,6 +71,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.nio.file.Path; +import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -153,6 +161,86 @@ public void testCloneSnapshotIndex() throws Exception { assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize()); } + public void testCloneShallowSnapshotIndex() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5,10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + + final String shallowSnapshot = "shallow-snapshot"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1); + + if (randomBoolean()) { + assertAcked(admin().indices().prepareDelete(indexName)); + } + + final String sourceSnapshot = shallowSnapshot; + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(shallowSnapshotRepoName, sourceSnapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + logger.info("Lock files count: {}", getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2); + } + + public void testShallowCloneNameAvailability() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + + final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5,10)); + + final String shallowSnapshot1 = "snapshot1"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot1); + + final String shallowSnapshot2 = "snapshot2"; + createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot2); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> startClone(shallowSnapshotRepoName, shallowSnapshot1, shallowSnapshot2, indexName, remoteStoreEnabledIndexName).get() + ); + assertThat(ex.getMessage(), containsString("snapshot with the same name already exists")); + } + public void testClonePreventsSnapshotDelete() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index ddf9f3e96b9b4..3a77734b8e2c3 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -51,16 +51,22 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -122,6 +128,8 @@ protected Settings nodeSettings(int nodeOrdinal) { // Rebalancing is causing some checks after restore to randomly fail // due to https://github.com/elastic/elasticsearch/issues/9421 .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") .build(); } @@ -511,6 +519,26 @@ protected void indexRandomDocs(String index, int numdocs) throws InterruptedExce assertDocCount(index, numdocs); } + protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo) + .build(); + } + + protected Settings.Builder snapshotRepoSettingsForShallowCopy() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()); + settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + return settings; + } + protected long getCountForIndex(String indexName) { return client().search( new SearchRequest(new SearchRequest(indexName).source(new SearchSourceBuilder().size(0).trackTotalHits(true))) @@ -521,6 +549,20 @@ protected void assertDocCount(String index, long count) { assertEquals(getCountForIndex(index), count); } + protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName) throws IOException { + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreIndex) + .get() + .getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID); + final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName); + BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files"); + BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath); + try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) { + return lockDirectory.listAll(); + } + } /** * Adds a snapshot in state {@link SnapshotState#FAILED} to the given repository. * From a1750e30d77ebf0a0f329b14cf522493d8d09e43 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Tue, 6 Jun 2023 13:16:01 -0700 Subject: [PATCH 5/9] Updated changelog and added default implementation Signed-off-by: Bansi Kasundra --- .../main/java/org/opensearch/repositories/Repository.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index a0c6d263a4b55..793b3d317e1bd 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -386,14 +386,16 @@ void cloneShardSnapshot( * @param remoteStoreLockManagerFactory remoteStoreLockManagerFactory for cloning metadata lock file * @param listener listener to complete with new shard generation once clone has completed */ - void cloneRemoteStoreIndexShardSnapshot( + default void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, RepositoryShardId shardId, @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener - ); + ) { + throw new UnsupportedOperationException(); + } /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} From 7645c83974c5ef532b219566473a5c5dc5f26add Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Tue, 13 Jun 2023 15:40:48 -0700 Subject: [PATCH 6/9] Addressed PR comments Signed-off-by: Bansi Kasundra --- .../opensearch/snapshots/CloneSnapshotIT.java | 89 ++++++++++++++++-- .../RemoteStoreShardShallowCopySnapshot.java | 4 +- .../index/store/lockmanager/FileLockInfo.java | 11 +-- .../lockmanager/RemoteStoreLockManager.java | 4 +- .../RemoteStoreMetadataLockManager.java | 24 ++--- .../repositories/FilterRepository.java | 11 ++- .../opensearch/repositories/Repository.java | 1 + .../blobstore/BlobStoreRepository.java | 91 +++++++++++++------ .../opensearch/snapshots/SnapshotInfo.java | 1 + .../snapshots/SnapshotsService.java | 91 +++++++++---------- .../store/lockmanager/FileLockInfoTests.java | 7 +- .../RepositoriesServiceTests.java | 1 + .../index/shard/RestoreOnlyRepository.java | 1 + .../AbstractSnapshotIntegTestCase.java | 5 +- 14 files changed, 228 insertions(+), 113 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index e1be1525d1043..ce92a15026b70 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -38,14 +38,11 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.SnapshotsInProgress; -import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; -import org.opensearch.indices.InvalidAliasNameException; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; import org.opensearch.snapshots.mockstore.MockRepository; @@ -172,7 +169,8 @@ public void testCloneShallowSnapshotIndex() throws Exception { createRepository(snapshotRepoName, "fs", snapshotRepoPath); final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; - createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + final Path shallowSnapshotRepoPath = randomRepoPath(); + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath)); final Path remoteStoreRepoPath = randomRepoPath(); final String remoteStoreRepoName = "remote-store-repo-name"; @@ -184,7 +182,7 @@ public void testCloneShallowSnapshotIndex() throws Exception { final String remoteStoreEnabledIndexName = "remote-index-1"; final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); - indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5,10)); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); final String snapshot = "snapshot"; createFullSnapshot(snapshotRepoName, snapshot); @@ -214,7 +212,8 @@ public void testShallowCloneNameAvailability() throws Exception { internalCluster().startDataOnlyNode(); final String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; - createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + final Path shallowSnapshotRepoPath = randomRepoPath(); + createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath)); final Path remoteStoreRepoPath = randomRepoPath(); final String remoteStoreRepoName = "remote-store-repo-name"; @@ -226,7 +225,7 @@ public void testShallowCloneNameAvailability() throws Exception { final String remoteStoreEnabledIndexName = "remote-index-1"; final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); - indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5,10)); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); final String shallowSnapshot1 = "snapshot1"; createFullSnapshot(shallowSnapshotRepoName, shallowSnapshot1); @@ -241,6 +240,80 @@ public void testShallowCloneNameAvailability() throws Exception { assertThat(ex.getMessage(), containsString("snapshot with the same name already exists")); } + public void testCloneAfterRepoShallowSettingEnabled() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS); + + // Updating the snapshot repository flag to enable shallow snapshots + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0); + assertTrue(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), false); + } + + public void testCloneAfterRepoShallowSettingDisabled() throws Exception { + disableRepoConsistencyCheck("This test uses remote store repository"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assertEquals(getSnapshot(snapshotRepoName, snapshot).state(), SnapshotState.SUCCESS); + + // Updating the snapshot repository flag to enable shallow snapshots + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + RepositoryMetadata updatedRepositoryMetadata = clusterAdmin().prepareGetRepositories(snapshotRepoName).get().repositories().get(0); + assertFalse(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(snapshotRepoName, snapshot, targetSnapshot, indexName, remoteStoreEnabledIndexName).get()); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2); + assertEquals(getSnapshot(snapshotRepoName, targetSnapshot).isRemoteStoreIndexShallowCopyEnabled(), true); + } + public void testClonePreventsSnapshotDelete() throws Exception { final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java index d9be8af07fa1e..8cb9fd3cd3c63 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -425,8 +425,8 @@ public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, lo commitGeneration, startTime, time, - 0, - 0, + totalFileCount, + totalSize, indexUUID, remoteStoreRepository, repositoryBasePath, diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java index e0bf67167ebc7..eefee8ae1036b 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -50,18 +50,17 @@ String getLockPrefix() { return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR; } - List getLocksForAcquirer(String[] lockFiles) { + String getLockForAcquirer(String[] lockFiles) { if (acquirerId == null || acquirerId.isBlank()) { throw new IllegalArgumentException("Acquirer ID should be provided"); } List locksForAcquirer = Arrays.stream(lockFiles) .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) .collect(Collectors.toList()); - assert locksForAcquirer.size() == 1 : "Multiple lock files found for acquirer"; - return locksForAcquirer; - return Arrays.stream(lockFiles) - .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) - .collect(Collectors.toList()); + if (locksForAcquirer.size() != 1) { + throw new IllegalStateException("Expected single lock file but found [" + locksForAcquirer.size() + "] lock files"); + } + return locksForAcquirer.get(0); } public static LockInfoBuilder getLockInfoBuilder() { diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index 0859a7bc60c21..9eb066d9e955e 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -40,7 +40,9 @@ public interface RemoteStoreLockManager { Boolean isAcquired(LockInfo lockInfo) throws IOException; /** - * + * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. + * There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a + * fix on LockManager side, Implementors must ensure thread safety for this operation. * @param originalLockInfo lock info instance for original lock. * @param clonedLockInfo lock info instance for which lock needs to be cloned. * @throws IOException throws IOException if originalResource itself do not have any lock. diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 67060d2519840..552cedc97df90 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -16,7 +16,7 @@ import java.io.IOException; import java.util.Collection; -import java.util.List; +import java.util.Objects; /** * A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to @@ -59,16 +59,8 @@ public void acquire(LockInfo lockInfo) throws IOException { public void release(LockInfo lockInfo) throws IOException { assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; String[] lockFiles = lockDirectory.listAll(); - - // ideally there should be only one lock per acquirer, but just to handle any stale locks, - // we try to release all the locks for the acquirer. - List locksToRelease = ((FileLockInfo) lockInfo).getLocksForAcquirer(lockFiles); - if (locksToRelease.size() > 1) { - logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId()); - } - for (String lock : locksToRelease) { - lockDirectory.deleteFile(lock); - } + String lockToRelease = ((FileLockInfo) lockInfo).getLockForAcquirer(lockFiles); + lockDirectory.deleteFile(lockToRelease); } /** @@ -85,7 +77,9 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { } /** - * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo + * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. + * Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding + * with the clone operation. Hence, the original lock file would always be present while acquiring the lock for cloned snapshot. * @param originalLockInfo lock info instance for original lock. * @param clonedLockInfo lock info instance for which lock needs to be cloned. * @throws IOException throws IOException if originalResource itself do not have any lock. @@ -94,11 +88,11 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { public void cloneLock(LockInfo originalLockInfo, LockInfo clonedLockInfo) throws IOException { assert originalLockInfo instanceof FileLockInfo : "originalLockInfo should be instance of FileLockInfo"; assert clonedLockInfo instanceof FileLockInfo : "clonedLockInfo should be instance of FileLockInfo"; - String originalResourceId = ((FileLockInfo) originalLockInfo).getAcquirerId(); - String clonedResourceId = ((FileLockInfo) clonedLockInfo).getAcquirerId(); + String originalResourceId = Objects.requireNonNull(((FileLockInfo) originalLockInfo).getAcquirerId()); + String clonedResourceId = Objects.requireNonNull(((FileLockInfo) clonedLockInfo).getAcquirerId()); assert originalResourceId != null && clonedResourceId != null : "provided resourceIds should not be null"; String[] lockFiles = lockDirectory.listAll(); - String lockNameForAcquirer = ((FileLockInfo) originalLockInfo).getLocksForAcquirer(lockFiles).get(0); + String lockNameForAcquirer = ((FileLockInfo) originalLockInfo).getLockForAcquirer(lockFiles); String fileToLockName = FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lockNameForAcquirer); acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(fileToLockName).withAcquirerId(clonedResourceId).build()); } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 764f36df6d337..690d4ca0400b9 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -253,12 +253,21 @@ public void executeConsistentStateUpdate( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, + RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.cloneRemoteStoreIndexShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); + in.cloneRemoteStoreIndexShardSnapshot( + source, + target, + repositoryData, + shardId, + shardGeneration, + remoteStoreLockManagerFactory, + listener + ); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 793b3d317e1bd..16bf7b36b9933 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -389,6 +389,7 @@ void cloneShardSnapshot( default void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, + RepositoryData repositoryData, RepositoryShardId shardId, @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c4959a0c47783..79bb08cfb0580 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -588,6 +588,7 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, + RepositoryData repositoryData, RepositoryShardId shardId, @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, @@ -601,32 +602,70 @@ public void cloneRemoteStoreIndexShardSnapshot( final int shardNum = shardId.shardId(); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); executor.execute(ActionRunnable.supply(listener, () -> { - final long startTime = threadPool.absoluteTimeInMillis(); - final BlobContainer shardContainer = shardContainer(index, shardNum); - // We don't need to check if there exists a shallow/full copy snapshot with the same name as we have the check before starting the clone - // operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone shallow snapshot would - // have a different UUID and hence a new unique shallow-snap-N file will be created. - RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); - - String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); - String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepository, - indexUUID, - String.valueOf(shardId.shardId()) - ); - remoteStoreMetadataLockManger.cloneLock( - FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), - FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() - ); - - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( - remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), - shardContainer, - target.getUUID(), - compress - ); - return shardGeneration; + // We don't need to check if there exists a shallow/full copy snapshot with the same name as we have the check before starting + // the clone operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone shallow + // snapshot + // would have a different UUID and hence a new unique shallow-snap-N file will be created. + try { + final long startTime = threadPool.relativeTimeInMillis(); + final BlobContainer shardContainer = shardContainer(index, shardNum); + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); + String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); + String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.shardId()) + ); + remoteStoreMetadataLockManger.cloneLock( + FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + ); + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compressor + ); + return shardGeneration; + } catch (Exception e) { + logger.info("Exception caught!"); + List indices = this.getSnapshotInfo(source).indices(); + List indexIds = repositoryData.resolveIndices(indices); + for (IndexId indexId : indexIds) { + logger.info("IndexId: {}", indexId); + IndexMetadata indexMetadata; + try { + indexMetadata = this.getSnapshotIndexMetaData(repositoryData, source, indexId); + } catch (Exception ex) { + continue; + } + if (indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + int numberOfShards = indexMetadata.getNumberOfShards(); + for (int shard = 0; shard < numberOfShards; shard++) { + logger.info("Trying to release lock for {} {}", indexId, shard); + final int finalShardId = shard; + String indexUUID = indexMetadata.getIndexUUID(); + String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + String.valueOf(finalShardId) + ); + try { + remoteStoreMetadataLockManager.release( + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + ); + } catch (Exception ex) { + logger.info("IGNORED EXCEPTION"); + // ignoring all exceptions while cleaning up lock files. Uncleaned files will be taken care of either during + // delete/cleanup operation. + } + } + } + } + throw e; + } })); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 1619f36738f7b..6bdbcfee29a9a 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -532,6 +532,7 @@ public Boolean includeGlobalState() { return includeGlobalState; } + @Nullable public Boolean isRemoteStoreIndexShallowCopyEnabled() { return remoteStoreIndexShallowCopy; } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 1410fcda53e76..d9b250909e09a 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -91,7 +91,6 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -628,7 +627,10 @@ public ClusterState execute(ClusterState currentState) { } } updatedEntry = cloneEntry.withClones(clonesBuilder) - .withRemoteStoreIndexShallowCopy(snapshotInfoListener.result().isRemoteStoreIndexShallowCopyEnabled());; + .withRemoteStoreIndexShallowCopy( + Boolean.TRUE.equals(snapshotInfoListener.result().isRemoteStoreIndexShallowCopyEnabled()) + ); + ; updatedEntries.set(i, updatedEntry); changed = true; break; @@ -656,7 +658,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS continue; } final RepositoryShardId repoShardId = indexClone.getKey(); - runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository); + final boolean remoteStoreIndexShallowCopy = Boolean.TRUE.equals(updatedEntry.remoteStoreIndexShallowCopy()); + runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository, remoteStoreIndexShallowCopy); } } else { // Extremely unlikely corner case of cluster-manager failing over between between starting the clone and @@ -674,17 +677,25 @@ private void runReadyClone( SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore, RepositoryShardId repoShardId, - Repository repository + Repository repository, + boolean remoteStoreIndexShallowCopy ) { final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); executor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - logger.warn("Failed to get Repository data"); + logger.warn( + "Failed to get repository data while cloning shard [{}] from [{}] to [{}]", + repoShardId, + sourceSnapshot, + target.getSnapshotId() + ); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); } @Override - protected void doRun() throws Exception { + protected void doRun() { + final String localNodeId = clusterService.localNode().getId(); repository.getRepositoryData(ActionListener.wrap(repositoryData -> { try { final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData( @@ -692,12 +703,9 @@ protected void doRun() throws Exception { sourceSnapshot, repoShardId.index() ); - final boolean remoteStoreIndexShallowCopy = repository.getSnapshotInfo(sourceSnapshot) - .isRemoteStoreIndexShallowCopyEnabled(); final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy && indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); final SnapshotId targetSnapshot = target.getSnapshotId(); - final String localNodeId = clusterService.localNode().getId(); final ActionListener listener = ActionListener.wrap( generation -> innerUpdateSnapshotState( new ShardSnapshotUpdate( @@ -722,42 +730,8 @@ protected void doRun() throws Exception { ) ), e -> { - if (cloneRemoteStoreIndexShardSnapshot) { - final String indexUUID = indexMetadata.getIndexUUID(); - final String remoteStoreRepoForIndex = indexMetadata.getSettings() - .get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); - try { - remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - String.valueOf(repoShardId.shardId()) - ).release(FileLockInfo.getLockInfoBuilder().withAcquirerId(targetSnapshot.getUUID()).build()); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - innerUpdateSnapshotState( - new ShardSnapshotUpdate( - target, - repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) - ), - ActionListener.runBefore( - ActionListener.wrap( - v -> logger.trace( - "Marked [{}] as failed clone from [{}] to [{}]", - repoShardId, - sourceSnapshot, - targetSnapshot - ), - ex -> { - logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); - failAllListenersOnMasterFailOver(ex); - } - ), - () -> currentlyCloning.remove(repoShardId) - ) - ); + logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); } ); if (currentlyCloning.add(repoShardId)) { @@ -765,6 +739,7 @@ protected void doRun() throws Exception { repository.cloneRemoteStoreIndexShardSnapshot( sourceSnapshot, targetSnapshot, + repositoryData, repoShardId, shardStatusBefore.generation(), remoteStoreLockManagerFactory, @@ -782,12 +757,34 @@ protected void doRun() throws Exception { } } catch (IOException e) { logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName()); + failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); } }, this::onFailure)); } }); } + private void failCloneShardAndUpdateClusterState(Snapshot target, SnapshotId sourceSnapshot, RepositoryShardId repoShardId) { + final String localNodeId = clusterService.localNode().getId(); + innerUpdateSnapshotState( + new ShardSnapshotUpdate( + target, + repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + ), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId, sourceSnapshot, target.getSnapshotId()), + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), + () -> currentlyCloning.remove(repoShardId) + ) + ); + } + private void ensureBelowConcurrencyLimit( String repository, String name, @@ -3142,12 +3139,14 @@ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nul // this is a clone, see if new work is ready for (final Map.Entry clone : entry.clones().entrySet()) { if (clone.getValue().state() == ShardState.INIT) { + final boolean remoteStoreIndexShallowCopy = Boolean.TRUE.equals(entry.remoteStoreIndexShallowCopy()); runReadyClone( entry.snapshot(), entry.source(), clone.getValue(), clone.getKey(), - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.repository()), + remoteStoreIndexShallowCopy ); } } diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java index 95af53cb6e5ec..383531be8141a 100644 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java @@ -10,8 +10,6 @@ import org.opensearch.test.OpenSearchTestCase; -import java.util.List; - public class FileLockInfoTests extends OpenSearchTestCase { String testMetadata = "testMetadata"; String testAcquirerId = "testAcquirerId"; @@ -47,10 +45,7 @@ public void testGetLocksForAcquirer() { FileLockInfo.LockFileUtils.generateLockName(testMetadata, "acquirerId2") }; FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertEquals( - fileLockInfo.getLocksForAcquirer(locks), - List.of(FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)) - ); + assertEquals(fileLockInfo.getLockForAcquirer(locks), FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)); } } diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index edf5b6c84bc54..34e4a0fd4cc81 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -383,6 +383,7 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, + RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 67f3f7fc1f50c..ec4cee0ac685d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -211,6 +211,7 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, + RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 3a77734b8e2c3..5f8d8ce653f06 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -532,9 +532,9 @@ protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) { .build(); } - protected Settings.Builder snapshotRepoSettingsForShallowCopy() { + protected Settings.Builder snapshotRepoSettingsForShallowCopy(Path path) { final Settings.Builder settings = Settings.builder(); - settings.put("location", randomRepoPath()); + settings.put("location", path); settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); return settings; } @@ -563,6 +563,7 @@ protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String rem return lockDirectory.listAll(); } } + /** * Adds a snapshot in state {@link SnapshotState#FAILED} to the given repository. * From fef5b5c2ed714f655efa89521219cd2afa0848ae Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Wed, 14 Jun 2023 14:57:23 -0700 Subject: [PATCH 7/9] Addressed PR comments Signed-off-by: Bansi Kasundra --- .../index/store/lockmanager/FileLockInfo.java | 7 +- .../RemoteStoreMetadataLockManager.java | 9 +- .../repositories/FilterRepository.java | 11 +-- .../opensearch/repositories/Repository.java | 1 - .../blobstore/BlobStoreRepository.java | 85 +++++-------------- .../snapshots/SnapshotsService.java | 2 +- .../store/lockmanager/FileLockInfoTests.java | 4 +- .../RepositoriesServiceTests.java | 1 - .../index/shard/RestoreOnlyRepository.java | 1 - 9 files changed, 40 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java index eefee8ae1036b..24f42743e1a04 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -8,6 +8,7 @@ package org.opensearch.index.store.lockmanager; +import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -50,13 +51,17 @@ String getLockPrefix() { return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR; } - String getLockForAcquirer(String[] lockFiles) { + String getLockForAcquirer(String[] lockFiles) throws NoSuchFileException { if (acquirerId == null || acquirerId.isBlank()) { throw new IllegalArgumentException("Acquirer ID should be provided"); } List locksForAcquirer = Arrays.stream(lockFiles) .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) .collect(Collectors.toList()); + + if (locksForAcquirer.isEmpty()) { + throw new NoSuchFileException("No lock file found for the acquirer: " + acquirerId); + } if (locksForAcquirer.size() != 1) { throw new IllegalStateException("Expected single lock file but found [" + locksForAcquirer.size() + "] lock files"); } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 552cedc97df90..35186e5bf5cce 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -15,6 +15,7 @@ import org.opensearch.index.store.RemoteBufferedOutputDirectory; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Objects; @@ -59,8 +60,12 @@ public void acquire(LockInfo lockInfo) throws IOException { public void release(LockInfo lockInfo) throws IOException { assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; String[] lockFiles = lockDirectory.listAll(); - String lockToRelease = ((FileLockInfo) lockInfo).getLockForAcquirer(lockFiles); - lockDirectory.deleteFile(lockToRelease); + try { + String lockToRelease = ((FileLockInfo) lockInfo).getLockForAcquirer(lockFiles); + lockDirectory.deleteFile(lockToRelease); + } catch (NoSuchFileException e) { + // Ignoring if the file to be deleted is not present. + } } /** diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 690d4ca0400b9..764f36df6d337 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -253,21 +253,12 @@ public void executeConsistentStateUpdate( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, - RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - in.cloneRemoteStoreIndexShardSnapshot( - source, - target, - repositoryData, - shardId, - shardGeneration, - remoteStoreLockManagerFactory, - listener - ); + in.cloneRemoteStoreIndexShardSnapshot(source, target, shardId, shardGeneration, remoteStoreLockManagerFactory, listener); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 16bf7b36b9933..793b3d317e1bd 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -389,7 +389,6 @@ void cloneShardSnapshot( default void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, - RepositoryData repositoryData, RepositoryShardId shardId, @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 79bb08cfb0580..adc0305575071 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -588,7 +588,6 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, - RepositoryData repositoryData, RepositoryShardId shardId, @Nullable String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, @@ -602,70 +601,30 @@ public void cloneRemoteStoreIndexShardSnapshot( final int shardNum = shardId.shardId(); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); executor.execute(ActionRunnable.supply(listener, () -> { + final long startTime = threadPool.relativeTimeInMillis(); + final BlobContainer shardContainer = shardContainer(index, shardNum); // We don't need to check if there exists a shallow/full copy snapshot with the same name as we have the check before starting // the clone operation ensuring that the snapshot name is available by checking the repository data. Also, the new clone shallow - // snapshot - // would have a different UUID and hence a new unique shallow-snap-N file will be created. - try { - final long startTime = threadPool.relativeTimeInMillis(); - final BlobContainer shardContainer = shardContainer(index, shardNum); - RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); - String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); - String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepository, - indexUUID, - String.valueOf(shardId.shardId()) - ); - remoteStoreMetadataLockManger.cloneLock( - FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), - FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() - ); - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( - remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), - shardContainer, - target.getUUID(), - compressor - ); - return shardGeneration; - } catch (Exception e) { - logger.info("Exception caught!"); - List indices = this.getSnapshotInfo(source).indices(); - List indexIds = repositoryData.resolveIndices(indices); - for (IndexId indexId : indexIds) { - logger.info("IndexId: {}", indexId); - IndexMetadata indexMetadata; - try { - indexMetadata = this.getSnapshotIndexMetaData(repositoryData, source, indexId); - } catch (Exception ex) { - continue; - } - if (indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { - int numberOfShards = indexMetadata.getNumberOfShards(); - for (int shard = 0; shard < numberOfShards; shard++) { - logger.info("Trying to release lock for {} {}", indexId, shard); - final int finalShardId = shard; - String indexUUID = indexMetadata.getIndexUUID(); - String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - String.valueOf(finalShardId) - ); - try { - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() - ); - } catch (Exception ex) { - logger.info("IGNORED EXCEPTION"); - // ignoring all exceptions while cleaning up lock files. Uncleaned files will be taken care of either during - // delete/cleanup operation. - } - } - } - } - throw e; - } + // snapshot would have a different UUID and hence a new unique shallow-snap-N file will be created. + RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = loadShallowCopyShardSnapshot(shardContainer, source); + String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); + String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); + RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.shardId()) + ); + remoteStoreMetadataLockManger.cloneLock( + FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), + FileLockInfo.getLockInfoBuilder().withAcquirerId(target.getUUID()).build() + ); + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + remStoreBasedShardMetadata.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), + shardContainer, + target.getUUID(), + compressor + ); + return shardGeneration; })); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index d9b250909e09a..0a455c376f62d 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -739,7 +739,6 @@ protected void doRun() { repository.cloneRemoteStoreIndexShardSnapshot( sourceSnapshot, targetSnapshot, - repositoryData, repoShardId, shardStatusBefore.generation(), remoteStoreLockManagerFactory, @@ -765,6 +764,7 @@ protected void doRun() { } private void failCloneShardAndUpdateClusterState(Snapshot target, SnapshotId sourceSnapshot, RepositoryShardId repoShardId) { + // Stale blobs/lock-files will be cleaned up during delete/cleanup operation. final String localNodeId = clusterService.localNode().getId(); innerUpdateSnapshotState( new ShardSnapshotUpdate( diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java index 383531be8141a..f3a2f1859923e 100644 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java @@ -10,6 +10,8 @@ import org.opensearch.test.OpenSearchTestCase; +import java.nio.file.NoSuchFileException; + public class FileLockInfoTests extends OpenSearchTestCase { String testMetadata = "testMetadata"; String testAcquirerId = "testAcquirerId"; @@ -39,7 +41,7 @@ public void testGetLockPrefixFailureCase() { assertThrows(IllegalArgumentException.class, fileLockInfo::getLockPrefix); } - public void testGetLocksForAcquirer() { + public void testGetLocksForAcquirer() throws NoSuchFileException { String[] locks = new String[] { FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId), FileLockInfo.LockFileUtils.generateLockName(testMetadata, "acquirerId2") }; diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 34e4a0fd4cc81..edf5b6c84bc54 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -383,7 +383,6 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, - RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index ec4cee0ac685d..67f3f7fc1f50c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -211,7 +211,6 @@ public void cloneShardSnapshot( public void cloneRemoteStoreIndexShardSnapshot( SnapshotId source, SnapshotId target, - RepositoryData repositoryData, RepositoryShardId shardId, String shardGeneration, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, From 5cc46cba22a2e11e6aa727b811bcf00f2a395bf9 Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Fri, 16 Jun 2023 19:23:10 -0700 Subject: [PATCH 8/9] Addressed PR comment Signed-off-by: Bansi Kasundra --- .../store/lockmanager/RemoteStoreMetadataLockManager.java | 2 ++ .../repositories/blobstore/BlobStoreRepository.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 35186e5bf5cce..fd7906729e314 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -49,6 +49,7 @@ public void acquire(LockInfo lockInfo) throws IOException { /** * Releases Locks acquired by a given acquirer which is passed in LockInfo Instance. + * If the lock file doesn't exist for the acquirer, release will be a no-op. * Right now this method is only used to release locks for a given acquirer, * This can be extended in future to handle other cases as well, like: * - release lock for given fileToLock and AcquirerId @@ -65,6 +66,7 @@ public void release(LockInfo lockInfo) throws IOException { lockDirectory.deleteFile(lockToRelease); } catch (NoSuchFileException e) { // Ignoring if the file to be deleted is not present. + logger.info("No lock file found for acquirerId: {}", ((FileLockInfo) lockInfo).getAcquirerId()); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index adc0305575071..be5fbf2ab6a51 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -572,13 +572,13 @@ public void cloneShardSnapshot( sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), shardContainer, target.getUUID(), - compress + compressor ); INDEX_SHARD_SNAPSHOTS_FORMAT.write( existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen, - compress + compressor ); return newGen; })); From 5a7b3ee3e25a36c2bcb1ddd61d89c323ff6147fc Mon Sep 17 00:00:00 2001 From: Bansi Kasundra Date: Thu, 6 Jul 2023 12:13:39 -0700 Subject: [PATCH 9/9] Addressed PR comments Signed-off-by: Bansi Kasundra --- .../main/java/org/opensearch/cluster/SnapshotsInProgress.java | 3 --- .../opensearch/snapshots/AbstractSnapshotIntegTestCase.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index a121f2182f158..ffc99c34fcac5 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -455,9 +455,6 @@ public Entry withClones(final Map update } public Entry withRemoteStoreIndexShallowCopy(final boolean remoteStoreIndexShallowCopy) { - if (remoteStoreIndexShallowCopy == this.remoteStoreIndexShallowCopy) { - return this; - } return new Entry( snapshot, includeGlobalState, diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 5f8d8ce653f06..83051c7fed4e4 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -58,7 +58,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -128,8 +127,6 @@ protected Settings nodeSettings(int nodeOrdinal) { // Rebalancing is causing some checks after restore to randomly fail // due to https://github.com/elastic/elasticsearch/issues/9421 .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") .build(); }