Skip to content

Commit

Permalink
Merge branch '2.x' of github.com:opensearch-project/OpenSearch into b…
Browse files Browse the repository at this point in the history
…acport2.x-validations
  • Loading branch information
bharath-techie committed Sep 3, 2024
2 parents c040b41 + 62e943c commit 4cfeff7
Show file tree
Hide file tree
Showing 21 changed files with 1,196 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -424,6 +425,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
Expand All @@ -436,6 +438,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -110,7 +118,10 @@ protected void clusterManagerOperation(
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
} else {
if (request.waitForCompletion()) {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());

if (request.waitForCompletion() || isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Priority;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -123,6 +124,29 @@ public void finalizeSnapshot(
);
}

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}

@Override
public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -175,6 +176,32 @@ void finalizeSnapshot(
ActionListener<RepositoryData> listener
);

/**
* Finalizes snapshotting process
* <p>
* This method is called on cluster-manager after all shards are snapshotted.
*
* @param shardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
*/
void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
);

/**
* Deletes snapshots
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -267,6 +268,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

public static final Setting<Boolean> REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false);

public static final Setting<Boolean> SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
Expand Down Expand Up @@ -1072,6 +1075,7 @@ private void doDeleteShardSnapshots(
repositoryStateId,
repoMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
);
}, listener::onFailure);
Expand Down Expand Up @@ -1101,39 +1105,46 @@ private void doDeleteShardSnapshots(
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
2
);
cleanupUnlinkedRootAndIndicesBlobs(
snapshotIds,
foundIndices,
rootBlobs,
newRepoData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(
snapshotIds,
repositoryData,
false,
remoteStoreLockManagerFactory,
writeMetaAndComputeDeletesStep
);
writeMetaAndComputeDeletesStep.whenComplete(
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
writeIndexGen(
updatedRepoData,
repositoryStateId,
repoMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(newRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
2
);
cleanupUnlinkedRootAndIndicesBlobs(
snapshotIds,
deleteResults,
foundIndices,
rootBlobs,
newRepoData,
remoteStoreLockManagerFactory,
afterCleanupsListener
),
afterCleanupsListener::onFailure
);
}, listener::onFailure));
);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(
snapshotIds,
repositoryData,
false,
remoteStoreLockManagerFactory,
writeMetaAndComputeDeletesStep
);
writeMetaAndComputeDeletesStep.whenComplete(
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
snapshotIds,
deleteResults,
remoteStoreLockManagerFactory,
afterCleanupsListener
),
afterCleanupsListener::onFailure
);
}, listener::onFailure)
);
}
}

Expand Down Expand Up @@ -1583,6 +1594,7 @@ public void cleanup(
repositoryStateId,
repositoryMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(
v -> cleanupStaleBlobs(
Collections.emptyList(),
Expand Down Expand Up @@ -1787,6 +1799,29 @@ public void finalizeSnapshot(
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
final ActionListener<RepositoryData> listener
) {
finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
Priority.NORMAL,
listener
);
}

@Override
public void finalizeSnapshot(
final ShardGenerations shardGenerations,
final long repositoryStateId,
final Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
final ActionListener<RepositoryData> listener
) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
+ repositoryStateId
Expand Down Expand Up @@ -1834,6 +1869,7 @@ public void finalizeSnapshot(
repositoryStateId,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
Expand Down Expand Up @@ -2367,17 +2403,19 @@ public boolean isSystemRepository() {
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
* pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener completion listener
*/
protected void writeIndexGen(
RepositoryData repositoryData,
long expectedGen,
Version version,
Function<ClusterState, ClusterState> stateFilter,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
assert isReadOnly() == false; // can not write to a read only repository
Expand All @@ -2402,7 +2440,7 @@ protected void writeIndexGen(
final StepListener<Long> setPendingStep = new StepListener<>();
clusterService.submitStateUpdateTask(
"set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {

private long newGen;

Expand Down Expand Up @@ -2540,7 +2578,7 @@ public void onFailure(Exception e) {
// Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask(
"set safe repository generation [" + metadata.name() + "][" + newGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
Expand Down
Loading

0 comments on commit 4cfeff7

Please sign in to comment.