Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898))
- Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730))
- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#8071](https://github.com/opensearch-project/OpenSearch/pull/8071))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
ImmutableOpenMap.of(),
null,
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random())
randomVersion(random()),
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public static Entry startedEntry(
long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
return new SnapshotsInProgress.Entry(
snapshot,
Expand All @@ -137,7 +138,8 @@ public static Entry startedEntry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -174,7 +176,8 @@ public static Entry startClone(
Collections.emptyMap(),
version,
source,
ImmutableOpenMap.of()
ImmutableOpenMap.of(),
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
);
}

Expand All @@ -187,6 +190,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean remoteStoreIndexShallowCopy;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
Expand Down Expand Up @@ -229,7 +233,8 @@ public Entry(
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -245,7 +250,8 @@ public Entry(
userMetadata,
version,
null,
ImmutableOpenMap.of()
ImmutableOpenMap.of(),
remoteStoreIndexShallowCopy
);
}

Expand All @@ -263,7 +269,8 @@ private Entry(
Map<String, Object> userMetadata,
Version version,
@Nullable SnapshotId source,
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones,
boolean remoteStoreIndexShallowCopy
) {
this.state = state;
this.snapshot = snapshot;
Expand All @@ -284,6 +291,7 @@ private Entry(
} else {
this.clones = clones;
}
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

Expand Down Expand Up @@ -324,6 +332,11 @@ private Entry(StreamInput in) throws IOException {
source = null;
clones = ImmutableOpenMap.of();
}
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
remoteStoreIndexShallowCopy = in.readBoolean();
} else {
remoteStoreIndexShallowCopy = false;
}
}

private static boolean assertShardsConsistent(
Expand Down Expand Up @@ -378,7 +391,8 @@ public Entry(
long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -392,7 +406,8 @@ public Entry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -417,7 +432,8 @@ public Entry(
shards,
failure,
entry.userMetadata,
version
version,
entry.remoteStoreIndexShallowCopy
);
}

Expand All @@ -441,7 +457,8 @@ public Entry withRepoGen(long newRepoGen) {
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -463,7 +480,8 @@ public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus>
userMetadata,
version,
source,
updatedClones
updatedClones,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -518,7 +536,8 @@ public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State s
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -544,7 +563,8 @@ public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shar
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}
return withStartedShards(shards);
Expand All @@ -567,7 +587,8 @@ public Entry withStartedShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> sh
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
assert updated.state().completed() == false && completed(updated.shards().values()) == false
: "Only running snapshots allowed but saw [" + updated + "]";
Expand Down Expand Up @@ -599,6 +620,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public Map<String, Object> userMetadata() {
return userMetadata;
}
Expand Down Expand Up @@ -662,7 +687,7 @@ public boolean equals(Object o) {
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
return true;
}

Expand All @@ -679,6 +704,7 @@ public int hashCode() {
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -752,6 +778,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(source);
out.writeMap(clones);
}
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeBoolean(remoteStoreIndexShallowCopy);
}
}

@Override
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,46 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException {
GatedCloseable<IndexCommit> indexCommit = acquireLastIndexCommit(flushFirst);
getEngine().refresh("Snapshot for Remote Store based Shard");
return indexCommit;
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in acquiring lock in remote store.
*/
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in releasing lock in remote store.
*/
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
}

private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() {
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
Expand Down
Loading