Skip to content

Commit 421d198

Browse files
Poojita-Rajbaba-devv
authored andcommitted
[Segment Replication][Remote Store] Remove commits when remote store is enabled (opensearch-project#8050)
* remove commits + fix failing test Signed-off-by: Poojita Raj <[email protected]> * fix failing tests Signed-off-by: Poojita Raj <[email protected]> * fix precommit failure Signed-off-by: Poojita Raj <[email protected]> * remove logs Signed-off-by: Poojita Raj <[email protected]> * address review comments Signed-off-by: Poojita Raj <[email protected]> --------- Signed-off-by: Poojita Raj <[email protected]>
1 parent 93dc412 commit 421d198

File tree

3 files changed

+83
-42
lines changed

3 files changed

+83
-42
lines changed

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
5656
private final CompletionStatsCache completionStatsCache;
5757
private final LocalCheckpointTracker localCheckpointTracker;
5858
private final WriteOnlyTranslogManager translogManager;
59+
private final boolean shouldCommit;
5960

6061
private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
6162

@@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
113114
engineConfig.getPrimaryModeSupplier()
114115
);
115116
this.translogManager = translogManagerRef;
117+
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
116118
} catch (IOException e) {
117119
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
118120
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -163,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
163165
* @throws IOException - When there is an IO error committing the SegmentInfos.
164166
*/
165167
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
166-
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
168+
if (shouldCommit) {
169+
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
170+
}
167171
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
168172
translogManager.syncTranslog();
169173
}
@@ -383,15 +387,21 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
383387
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
384388
: "Either the write lock must be held or the engine must be currently be failing itself";
385389
try {
386-
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
387-
/*
388-
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
389-
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
390-
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
391-
*/
392-
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
393-
latestSegmentInfos.changed();
394-
commitSegmentInfos(latestSegmentInfos);
390+
// if remote store is enabled, all segments durably persisted
391+
if (shouldCommit) {
392+
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
393+
/*
394+
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
395+
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
396+
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
397+
*/
398+
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
399+
latestSegmentInfos.changed();
400+
commitSegmentInfos(latestSegmentInfos);
401+
} else {
402+
store.directory().sync(List.of(store.directory().listAll()));
403+
store.directory().syncMetaData();
404+
}
395405
IOUtils.close(readerManager, translogManager, store::decRef);
396406
} catch (Exception e) {
397407
logger.warn("failed to close engine", e);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4661,31 +4661,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
46614661
indexInput,
46624662
remoteSegmentMetadata.getGeneration()
46634663
);
4664+
// Replicas never need a local commit
46644665
if (shouldCommit) {
4665-
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
4666-
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
4667-
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
4668-
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
4669-
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
4670-
// commit.
4671-
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
4672-
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
4673-
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
4674-
if (localMaxSegmentInfos.isPresent()
4675-
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
4676-
- 1) {
4677-
// If remote translog is not enabled, local translog will be created with different UUID.
4678-
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
4679-
// to be same. Following code block make sure to have the same UUID.
4680-
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
4681-
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
4682-
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
4683-
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
4684-
infosSnapshot.setUserData(userData, false);
4666+
if (this.shardRouting.primary()) {
4667+
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
4668+
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
4669+
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
4670+
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
4671+
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
4672+
// latest commit.
4673+
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
4674+
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
4675+
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
4676+
if (localMaxSegmentInfos.isPresent()
4677+
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
4678+
- 1) {
4679+
// If remote translog is not enabled, local translog will be created with different UUID.
4680+
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
4681+
// to be same. Following code block make sure to have the same UUID.
4682+
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
4683+
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
4684+
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
4685+
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
4686+
infosSnapshot.setUserData(userData, false);
4687+
}
4688+
storeDirectory.deleteFile(localMaxSegmentInfos.get());
46854689
}
4686-
storeDirectory.deleteFile(localMaxSegmentInfos.get());
4690+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
46874691
}
4688-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
46894692
} else {
46904693
finalizeReplication(infosSnapshot);
46914694
}

server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ public class NRTReplicationEngineTests extends EngineTestCase {
4747
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
4848
);
4949

50+
private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
51+
"index",
52+
Settings.builder()
53+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
54+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
55+
.build()
56+
);
57+
5058
public void testCreateEngine() throws IOException {
5159
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
5260
try (
@@ -132,6 +140,29 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
132140
}
133141
}
134142

143+
public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException {
144+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
145+
146+
try (
147+
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
148+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
149+
) {
150+
// assume we start at the same gen.
151+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
152+
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
153+
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());
154+
155+
// flush the primary engine - we don't need any segments, just force a new commit point.
156+
engine.flush(true, true);
157+
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
158+
159+
// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
160+
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
161+
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
162+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
163+
}
164+
}
165+
135166
public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException {
136167
// if the replica is already at segments_N that is received, it will commit segments_N+1.
137168
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
@@ -315,18 +346,11 @@ public void testCommitSegmentInfos() throws Exception {
315346
}
316347
}
317348

318-
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
349+
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings)
350+
throws IOException {
319351
Lucene.cleanLuceneIndex(store.directory());
320352
final Path translogDir = createTempDir();
321-
final EngineConfig replicaConfig = config(
322-
defaultSettings,
323-
store,
324-
translogDir,
325-
NoMergePolicy.INSTANCE,
326-
null,
327-
null,
328-
globalCheckpoint::get
329-
);
353+
final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
330354
if (Lucene.indexExists(store.directory()) == false) {
331355
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
332356
final String translogUuid = Translog.createEmptyTranslog(
@@ -339,4 +363,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
339363
}
340364
return new NRTReplicationEngine(replicaConfig);
341365
}
366+
367+
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
368+
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
369+
}
342370
}

0 commit comments

Comments
 (0)