Skip to content

Commit 4fea145

Browse files
gbbafnadk2k
authored andcommitted
[Snapshot V2] Move timestamp pinning before cluster state update (opensearch-project#16269)
* Move timestamp pinning before cluster state update Signed-off-by: Gaurav Bafna <[email protected]> * Address PR Comments Signed-off-by: Gaurav Bafna <[email protected]> * Fix IT Signed-off-by: Gaurav Bafna <[email protected]> --------- Signed-off-by: Gaurav Bafna <[email protected]>
1 parent 4b9a024 commit 4fea145

File tree

2 files changed

+80
-96
lines changed

2 files changed

+80
-96
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

+11-16
Original file line numberDiff line numberDiff line change
@@ -938,17 +938,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
938938
Thread thread = new Thread(() -> {
939939
try {
940940
String snapshotName = "snapshot-concurrent-" + snapshotIndex;
941-
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
942-
.cluster()
943-
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
944-
.setWaitForCompletion(true)
945-
.get();
946-
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
947-
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
948-
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
949-
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
950-
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
951-
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
941+
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get();
942+
logger.info("Snapshot completed {}", snapshotName);
952943
} catch (Exception e) {}
953944
});
954945
threads.add(thread);
@@ -963,15 +954,19 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
963954
thread.join();
964955
}
965956

966-
// Validate that only one snapshot has been created
957+
// Sleeping 10 sec for earlier created snapshot to complete runNextQueuedOperation and be ready for next snapshot
958+
// We can't put `waitFor` since we don't have visibility on its completion
959+
Thread.sleep(TimeValue.timeValueSeconds(10).seconds());
960+
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, "snapshot-cleanup-timestamp").setWaitForCompletion(true).get();
967961
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
968962
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
969963
repository.getRepositoryData(repositoryDataPlainActionFuture);
970-
971964
RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
972-
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
973-
forceSyncPinnedTimestamps();
974-
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
965+
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(2));
966+
waitUntil(() -> {
967+
forceSyncPinnedTimestamps();
968+
return RemoteStorePinnedTimestampService.getPinnedEntities().size() == repositoryData.getSnapshotIds().size();
969+
});
975970
}
976971

977972
public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

+69-80
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.cluster.service.ClusterService;
8080
import org.opensearch.common.Nullable;
8181
import org.opensearch.common.Priority;
82+
import org.opensearch.common.SetOnce;
8283
import org.opensearch.common.UUIDs;
8384
import org.opensearch.common.collect.Tuple;
8485
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
@@ -466,34 +467,35 @@ public TimeValue timeout() {
466467
* @param listener snapshot creation listener
467468
*/
468469
public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
469-
long pinnedTimestamp = System.currentTimeMillis();
470470
final String repositoryName = request.repository();
471471
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
472+
validate(repositoryName, snapshotName);
473+
474+
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
475+
Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
476+
long pinnedTimestamp = System.currentTimeMillis();
477+
try {
478+
updateSnapshotPinnedTimestamp(snapshot, pinnedTimestamp);
479+
} catch (Exception e) {
480+
listener.onFailure(e);
481+
return;
482+
}
472483

473484
Repository repository = repositoriesService.repository(repositoryName);
474-
validate(repositoryName, snapshotName);
475485
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
476486
private SnapshotsInProgress.Entry newEntry;
477-
478-
private SnapshotId snapshotId;
479-
480-
private Snapshot snapshot;
481-
482487
boolean enteredLoop;
483488

484489
@Override
485490
public ClusterState execute(ClusterState currentState) {
486491
// move to in progress
487-
snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
488492
Repository repository = repositoriesService.repository(repositoryName);
489-
490493
if (repository.isReadOnly()) {
491494
listener.onFailure(
492495
new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository")
493496
);
494497
}
495498

496-
snapshot = new Snapshot(repositoryName, snapshotId);
497499
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
498500

499501
createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName);
@@ -593,59 +595,46 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
593595
pinnedTimestamp
594596
);
595597
final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
596-
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
597-
pinnedTimestampListener.whenComplete(repoData -> {
598-
repository.finalizeSnapshot(
599-
shardGenerations,
600-
repositoryData.getGenId(),
601-
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
602-
snapshotInfo,
603-
version,
604-
state -> stateWithoutSnapshot(state, snapshot),
605-
Priority.IMMEDIATE,
606-
new ActionListener<RepositoryData>() {
607-
@Override
608-
public void onResponse(RepositoryData repositoryData) {
609-
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
610-
leaveRepoLoop(repositoryName);
611-
failSnapshotCompletionListeners(
612-
snapshot,
613-
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
614-
);
615-
listener.onFailure(
616-
new SnapshotException(
617-
repositoryName,
618-
snapshotName,
619-
"Aborting snapshot-v2, no longer cluster manager"
620-
)
621-
);
622-
return;
623-
}
624-
listener.onResponse(snapshotInfo);
625-
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
626-
// can get queued . This is triggering them.
627-
runNextQueuedOperation(repositoryData, repositoryName, true);
628-
cleanOrphanTimestamp(repositoryName, repositoryData);
629-
}
630-
631-
@Override
632-
public void onFailure(Exception e) {
633-
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
598+
repository.finalizeSnapshot(
599+
shardGenerations,
600+
repositoryData.getGenId(),
601+
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
602+
snapshotInfo,
603+
version,
604+
state -> stateWithoutSnapshot(state, snapshot),
605+
Priority.IMMEDIATE,
606+
new ActionListener<RepositoryData>() {
607+
@Override
608+
public void onResponse(RepositoryData repositoryData) {
609+
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
634610
leaveRepoLoop(repositoryName);
635-
// cleaning up in progress snapshot here
636-
stateWithoutSnapshotV2(newState);
637-
listener.onFailure(e);
611+
failSnapshotCompletionListeners(
612+
snapshot,
613+
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
614+
);
615+
listener.onFailure(
616+
new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager")
617+
);
618+
return;
638619
}
620+
listener.onResponse(snapshotInfo);
621+
logger.info("created snapshot-v2 [{}] in repository [{}]", repositoryName, snapshotName);
622+
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
623+
// can get queued . This is triggering them.
624+
runNextQueuedOperation(repositoryData, repositoryName, true);
625+
cleanOrphanTimestamp(repositoryName, repositoryData);
639626
}
640-
);
641-
}, e -> {
642-
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e);
643-
leaveRepoLoop(repositoryName);
644-
// cleaning up in progress snapshot here
645-
stateWithoutSnapshotV2(newState);
646-
listener.onFailure(e);
647-
});
648-
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);
627+
628+
@Override
629+
public void onFailure(Exception e) {
630+
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
631+
leaveRepoLoop(repositoryName);
632+
// cleaning up in progress snapshot here
633+
stateWithoutSnapshotV2(newState);
634+
listener.onFailure(e);
635+
}
636+
}
637+
);
649638
}
650639

651640
@Override
@@ -733,30 +722,30 @@ private void createSnapshotPreValidations(
733722
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
734723
}
735724

736-
private void updateSnapshotPinnedTimestamp(
737-
RepositoryData repositoryData,
738-
Snapshot snapshot,
739-
long timestampToPin,
740-
ActionListener<RepositoryData> listener
741-
) {
725+
private void updateSnapshotPinnedTimestamp(Snapshot snapshot, long timestampToPin) throws Exception {
726+
CountDownLatch latch = new CountDownLatch(1);
727+
SetOnce<Exception> ex = new SetOnce<>();
728+
ActionListener<Void> listener = new ActionListener<>() {
729+
@Override
730+
public void onResponse(Void unused) {
731+
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
732+
}
733+
734+
@Override
735+
public void onFailure(Exception e) {
736+
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
737+
ex.set(e);
738+
}
739+
};
742740
remoteStorePinnedTimestampService.pinTimestamp(
743741
timestampToPin,
744742
getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()),
745-
new ActionListener<Void>() {
746-
@Override
747-
public void onResponse(Void unused) {
748-
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
749-
listener.onResponse(repositoryData);
750-
}
751-
752-
@Override
753-
public void onFailure(Exception e) {
754-
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
755-
listener.onFailure(e);
756-
757-
}
758-
}
743+
new LatchedActionListener<>(listener, latch)
759744
);
745+
latch.await();
746+
if (ex.get() != null) {
747+
throw ex.get();
748+
}
760749
}
761750

762751
public static String getPinningEntity(String repositoryName, String snapshotUUID) {

0 commit comments

Comments
 (0)