Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Shallow copy snapshot failures on closed index #16868

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -1078,4 +1081,67 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms"));

assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Path shallowSnapshotRepoPath = randomRepoPath();
Settings.Builder settings = Settings.builder()
.put("location", shallowSnapshotRepoPath)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE);
createRepository(shallowSnapshotRepoName, "fs", settings);

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}
flushAndRefresh(INDEX_NAME);

logger.info("Verify shallow snapshot created before close");
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state());
assertTrue(snapshotInfo1.successfulShards() > 0);
assertEquals(0, snapshotInfo1.failedShards());

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}

// close index
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(1000);
logger.info("Verify shallow snapshot created after close");
final String snapshot2 = "snapshot2";

SnapshotInfo snapshotInfo2 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state());
assertTrue(snapshotInfo2.successfulShards() > 0);
assertEquals(0, snapshotInfo2.failedShards());
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}
}
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,22 @@
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
* Fetches the last remote uploaded segment metadata file
* @return {@link RemoteSegmentMetadata}
* @throws IOException
*/
public RemoteSegmentMetadata fetchLastRemoteUploadedSegmentMetadata() throws IOException {
if (!indexSettings.isAssignedOnRemoteNode()) {
throw new IllegalStateException("Index is not assigned on Remote Node");

Check warning on line 1634 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1634

Added line #L1634 was not covered by tests
}
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile();
if (lastUploadedMetadata == null) {
throw new FileNotFoundException("No metadata file found in remote store");

Check warning on line 1638 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1638

Added line #L1638 was not covered by tests
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
}
return lastUploadedMetadata;
}

/**
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,45 @@
throw new UnsupportedOperationException();
}

/**
* Adds a reference of remote store data for a index commit point.
* <p>
* The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method.
* Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#fetchLastRemoteUploadedSegmentMetadata()} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param primaryTerm current Primary Term
* @param commitGeneration current commit generation
* @param startTime start time of the snapshot commit, this will be used as the start time for snapshot.
* @param indexFilesToFileLengthMap map of index files to file length
* @param listener listener invoked on completion
*/
default void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
@Nullable IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
@Nullable Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
throw new UnsupportedOperationException();

Check warning on line 455 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L455

Added line #L455 was not covered by tests
}

/**
* Restores snapshot of the shard.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3744,6 +3744,33 @@
}
}

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long startTime,
ActionListener<String> listener
) {
snapshotRemoteStoreIndexShard(

Check warning on line 3759 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3759

Added line #L3759 was not covered by tests
store,
snapshotId,
indexId,
snapshotIndexCommit,
shardStateIdentifier,
snapshotStatus,
primaryTerm,
snapshotIndexCommit.getGeneration(),

Check warning on line 3767 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3767

Added line #L3767 was not covered by tests
startTime,
null,
listener
);
}

Check warning on line 3772 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3772

Added line #L3772 was not covered by tests

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
Expand All @@ -3753,27 +3780,38 @@
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}

final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
List<String> fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
List<String> fileNames;

if (snapshotIndexCommit != null) {
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
}
} else {
fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();

Check warning on line 3812 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3811-L3812

Added lines #L3811 - L3812 were not covered by tests
}

int indexTotalNumberOfFiles = fileNames.size();

snapshotStatus.moveToStarted(
Expand All @@ -3784,7 +3822,7 @@
indexTotalFileSize
);

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration);

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
Expand All @@ -3795,7 +3833,7 @@
snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
primaryTerm,
snapshotIndexCommit.getGeneration(),
commitGeneration,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
indexTotalNumberOfFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -74,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -371,7 +372,9 @@
ActionListener<String> listener
) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShardOrNull(shardId.id());
final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
Expand All @@ -398,36 +401,56 @@
if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
long startTime = threadPool.relativeTimeInMillis();
long primaryTerm = indexShard.getOperationPrimaryTerm();
// we flush first to make sure we get the latest writes snapshotted
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
long commitGeneration = 0L;
Map<String, Long> indexFilesToFileLengthMap = null;
IndexCommit snapshotIndexCommit = null;

try {
if (closedIndex) {
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata();
indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength()));
primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm();
commitGeneration = lastRemoteUploadedIndexCommit.getGeneration();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (NoSuchFileException e) {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (IOException e) {

Check warning on line 423 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L423

Added line #L423 was not covered by tests
if (closedIndex) {
logger.warn("Exception while reading latest metadata file from remote store");
listener.onFailure(e);

Check warning on line 426 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L425-L426

Added lines #L425 - L426 were not covered by tests
} else {
wrappedSnapshot.close();
logger.warn(

Check warning on line 429 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L428-L429

Added lines #L428 - L429 were not covered by tests
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration

Check warning on line 432 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L431-L432

Added lines #L431 - L432 were not covered by tests
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);

Check warning on line 438 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L434-L438

Added lines #L434 - L438 were not covered by tests
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
getShardStateId(indexShard, snapshotIndexCommit),
null,
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
ActionListener.runBefore(listener, wrappedSnapshot::close)
indexFilesToFileLengthMap,
closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close)
);
} catch (IndexShardSnapshotFailedException e) {
logger.error(
Expand Down
Loading
Loading