Skip to content

Commit

Permalink
Optimise clone operation for incremental full cluster snapshots (#16296
Browse files Browse the repository at this point in the history
…) (#16303)

* Optimise clone operation for incremental full cluster snapshots



* Add UTs



* Add CHANGELOG



---------

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Daniel Widdis <[email protected]>
Co-authored-by: Daniel Widdis <[email protected]>
  • Loading branch information
ashking94 and dbwiddis authored Oct 14, 2024
1 parent 02f4e6d commit 0a34a97
Show file tree
Hide file tree
Showing 3 changed files with 504 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Remove Identity FeatureFlag ([#16024](https://github.com/opensearch-project/OpenSearch/pull/16024))
- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https://github.com/opensearch-project/OpenSearch/pull/16154))
- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))


### Deprecated

### Removed
Expand Down
121 changes: 64 additions & 57 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());

private void runReadyClone(
// Made to package private to be able to test the method in UTs
void runReadyClone(
Snapshot target,
SnapshotId sourceSnapshot,
ShardSnapshotStatus shardStatusBefore,
Expand All @@ -1534,69 +1535,75 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
final String localNodeId = clusterService.localNode().getId();
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
if (remoteStoreIndexShallowCopy == false) {
executeClone(localNodeId, false);
} else {
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
sourceSnapshot,
repoShardId.index()
);
final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
executeClone(localNodeId, cloneRemoteStoreIndexShardSnapshot);
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
}
}

private void executeClone(String localNodeId, boolean cloneRemoteStoreIndexShardSnapshot) {
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
repoShardId.index()
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
getCloneCompletionListener(localNodeId)
);
final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
final SnapshotId targetSnapshot = target.getSnapshotId();
final ActionListener<String> listener = ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)
),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
sourceSnapshot,
targetSnapshot
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
getCloneCompletionListener(localNodeId)
);
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
targetSnapshot,
}
}
}

private ActionListener<String> getCloneCompletionListener(String localNodeId) {
return ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
listener
);
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
targetSnapshot,
repoShardId,
shardStatusBefore.generation(),
listener
);
}
}
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
target.getSnapshotId()
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
);
}
});
}
Expand Down
Loading

0 comments on commit 0a34a97

Please sign in to comment.