Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot V2] Move timestamp pinning before cluster state update #16269

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-concurrent-" + snapshotIndex;
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get();
logger.info("Snapshot completed {}", snapshotName);
} catch (Exception e) {}
});
threads.add(thread);
Expand All @@ -963,15 +954,19 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
thread.join();
}

// Validate that only one snapshot has been created
// Sleeping 10 sec for earlier created snapshot to complete runNextQueuedOperation and be ready for next snapshot
// We can't put `waitFor` since we don't have visibility on its completion
Thread.sleep(TimeValue.timeValueSeconds(10).seconds());
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, "snapshot-cleanup-timestamp").setWaitForCompletion(true).get();
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(2));
waitUntil(() -> {
forceSyncPinnedTimestamps();
return RemoteStorePinnedTimestampService.getPinnedEntities().size() == repositoryData.getSnapshotIds().size();
});
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
Expand Down
149 changes: 69 additions & 80 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand Down Expand Up @@ -466,34 +467,35 @@
* @param listener snapshot creation listener
*/
public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
long pinnedTimestamp = System.currentTimeMillis();
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);

final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
long pinnedTimestamp = System.currentTimeMillis();
try {
updateSnapshotPinnedTimestamp(snapshot, pinnedTimestamp);
} catch (Exception e) {
listener.onFailure(e);
return;

Check warning on line 481 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L479-L481

Added lines #L479 - L481 were not covered by tests
}

Repository repository = repositoriesService.repository(repositoryName);
validate(repositoryName, snapshotName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;

private SnapshotId snapshotId;

private Snapshot snapshot;

boolean enteredLoop;

@Override
public ClusterState execute(ClusterState currentState) {
// move to in progress
snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(repositoryName);

if (repository.isReadOnly()) {
listener.onFailure(
new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository")
);
}

snapshot = new Snapshot(repositoryName, snapshotId);
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());

createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName);
Expand Down Expand Up @@ -593,59 +595,46 @@
pinnedTimestamp
);
final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(
new SnapshotException(
repositoryName,
snapshotName,
"Aborting snapshot-v2, no longer cluster manager"
)
);
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
failSnapshotCompletionListeners(

Check warning on line 611 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L611

Added line #L611 was not covered by tests
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(

Check warning on line 615 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L615

Added line #L615 was not covered by tests
new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager")
);
return;

Check warning on line 618 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L618

Added line #L618 was not covered by tests
}
listener.onResponse(snapshotInfo);
logger.info("created snapshot-v2 [{}] in repository [{}]", repositoryName, snapshotName);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e);
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
});
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
leaveRepoLoop(repositoryName);

Check warning on line 631 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L630-L631

Added lines #L630 - L631 were not covered by tests
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
}

Check warning on line 635 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L633-L635

Added lines #L633 - L635 were not covered by tests
}
);
}

@Override
Expand Down Expand Up @@ -733,30 +722,30 @@
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
}

private void updateSnapshotPinnedTimestamp(
RepositoryData repositoryData,
Snapshot snapshot,
long timestampToPin,
ActionListener<RepositoryData> listener
) {
private void updateSnapshotPinnedTimestamp(Snapshot snapshot, long timestampToPin) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
SetOnce<Exception> ex = new SetOnce<>();
ActionListener<Void> listener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
ex.set(e);
}

Check warning on line 738 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L736-L738

Added lines #L736 - L738 were not covered by tests
};
remoteStorePinnedTimestampService.pinTimestamp(
timestampToPin,
getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()),
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
listener.onResponse(repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
listener.onFailure(e);

}
}
new LatchedActionListener<>(listener, latch)
);
latch.await();
if (ex.get() != null) {
throw ex.get();

Check warning on line 747 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L747

Added line #L747 was not covered by tests
}
}

public static String getPinningEntity(String repositoryName, String snapshotUUID) {
Expand Down
Loading