Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update getSegmentInfosSnapshot() logic to return SegmentInfos with highest generation number #4288

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4307](https://github.com/opensearch-project/OpenSearch/pull/4307))
- Do not fail replica shard due to primary closure ([#4133](https://github.com/opensearch-project/OpenSearch/pull/4133))
- [Segment Replication] Update getSegmentInfosSnapshot() logic to return SegmentInfos with the highest generation number

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2131,19 +2131,30 @@ protected SegmentInfos getLatestSegmentInfos() {
}

/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
* Fetch the latest {@link SegmentInfos} object. The method uses reads in-memory SegmentInfos via {@link #getLatestSegmentInfos()}
* and on-disk SegmentInfos and returns instance which has the highest segment generation. This method also
* increment the ref-count to ensure that these segment files are retained until the reference is closed. On close,
* the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
final SegmentInfos inMemorySegInfos = getLatestSegmentInfos();
SegmentInfos result;
try {
indexWriter.incRefDeleter(segmentInfos);
final long lastCommitGeneration = store.getLastCommitGeneration();
final long generation = inMemorySegInfos.getGeneration();
if (generation < lastCommitGeneration) {
// the latest in memory infos is behind disk, read the latest SegmentInfos from disk and return that.
final SegmentInfos latestCommit = store.readLastCommittedSegmentsInfo();
result = latestCommit;
} else {
result = inMemorySegInfos;
}
indexWriter.incRefDeleter(result);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
return new GatedCloseable<>(result, () -> indexWriter.decRefDeleter(result));
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Returns generation number (_N in segment_N) from last commit on disk.
*/
public long getLastCommitGeneration() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this method on the store? can we call

SegmentInfos.getLastCommitGeneration(store.directory());

directly from InternalEngine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created new method for consistency purpose as to keep disk(store) related calls on Store

return SegmentInfos.getLastCommitGeneration(directory());
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7546,6 +7546,61 @@ public void testGetSegmentInfosSnapshot() throws IOException {
engine.close();
}

// This method simulates behaviour of on-disk SegmentInfos having higher segment generation number compared to
// actual in-memory SegmentInfos and verified that engine.getSegmentInfosSnapshot returns on-disk SegmentInfos
public void testGetSegmentInfosWithHighestGenOnDisk() throws IOException {
IOUtils.close(store, engine);
Store store = spy(createStore());
InternalEngine engine = createEngine(store, createTempDir());
final int numDocs = randomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (randomBoolean()) {
engine.refresh("test");
}
}
engine.flush(true, true);

SegmentInfos sisDisk = store.readLastCommittedSegmentsInfo();
// Increment generation number of on-disk SegmentInfos
sisDisk.setNextWriteGeneration(sisDisk.getGeneration() + 1);

when(store.getLastCommitGeneration()).thenReturn(sisDisk.getGeneration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than mocking here, you should be able to invoke SegmentInfos.commit and create a new _N file with a higher generation.

Copy link
Member Author

@dreamer-89 dreamer-89 Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing this, I wasn't aware of this method.

I tried SegmentInfos.commit but it also updated the in-memory copy of SegmentInfos i.e. both sisDisk & sisInMemory have same generation number. So, for tests, need to use mocks to make on-disk copy to have higher segment generation.

when(store.readLastCommittedSegmentsInfo()).thenReturn(sisDisk);

GatedCloseable<SegmentInfos> segmentInfosSnapshot = engine.getSegmentInfosSnapshot();
assertEquals(segmentInfosSnapshot.get(), sisDisk);
assertEquals(segmentInfosSnapshot.get().getGeneration(), sisDisk.getGeneration());
segmentInfosSnapshot.close();
store.close();
engine.close();
}

// This method verifies that when on-disk segment generation is not higher compared to memory copy, then
// engine.getSegmentInfosSnapsho returns in-memory SegmentInfos
public void testGetSegmentInfosWithHighestGenInMemory() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> This test case will have both in-memory and on-disk segmentInfos on same segment generation number but not lower on on-disk as it is mentioned (or purpose of this test case). In this test case we are not incrementing or decrementing segment generation of either on-disk on in-memory, so both will be same. So we are not testing the case here that we actually want to.

-> Is this test really necessary? Will there be a scenario ever with on-disk segmentInfos having lower segment generation number than in-memory ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Looks like comment is outdated. Updated.
  2. Yes, this is useful to ensure in memory copy gets picked up (when segment generation is higher/equal compared to on-disk copy's segment generation). Yes, this scenario is happening during replica promotion.

IOUtils.close(store, engine);
Store store = spy(createStore());
InternalEngine engine = createEngine(store, createTempDir());
final int numDocs = randomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (randomBoolean()) {
engine.refresh("test");
}
}
engine.flush(true, true);

SegmentInfos sisInMemory = engine.getLatestSegmentInfos();

GatedCloseable<SegmentInfos> segmentInfosSnapshot = engine.getSegmentInfosSnapshot();
assertEquals(segmentInfosSnapshot.get(), sisInMemory);
assertEquals(segmentInfosSnapshot.get().getGeneration(), sisInMemory.getGeneration());
segmentInfosSnapshot.close();
store.close();
engine.close();
}

public void testGetProcessedLocalCheckpoint() throws IOException {
final long expectedLocalCheckpoint = 1L;
IOUtils.close(store, engine);
Expand Down