Skip to content

Commit e1a3a9f

Browse files
mch2dreamer-89
authored andcommitted
[Segment Replication] - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards. (opensearch-project#4402)
* Segment Replication - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards. This change updates replicas to commit SegmentInfos before the shard is closed, on receiving a new commit point from a primary, and when a new primary is detected. This change also makes the public commitSegmentInfos on NRTEngine obsolete, refactoring IndexShard to simply call reset on the engine. Signed-off-by: Marc Handalian <[email protected]> * Remove noise & extra log statement. Signed-off-by: Marc Handalian <[email protected]> * PR feedback. Signed-off-by: Marc Handalian <[email protected]> Signed-off-by: Marc Handalian <[email protected]>
1 parent 0b85fd7 commit e1a3a9f

File tree

10 files changed

+290
-151
lines changed

10 files changed

+290
-151
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
1111
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
1212
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
1313
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
14+
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
1415

1516
### Deprecated
1617

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+25-16
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class NRTReplicationEngine extends Engine implements LifecycleAware {
5656
private final LocalCheckpointTracker localCheckpointTracker;
5757
private final WriteOnlyTranslogManager translogManager;
5858

59+
private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
60+
5961
private static final int SI_COUNTER_INCREMENT = 10;
6062

6163
public NRTReplicationEngine(EngineConfig engineConfig) {
@@ -120,14 +122,16 @@ public TranslogManager translogManager() {
120122

121123
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
122124
// Update the current infos reference on the Engine's reader.
125+
final long incomingGeneration = infos.getGeneration();
123126
readerManager.updateSegments(infos);
124127

125-
// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
126-
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
127-
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
128-
this.lastCommittedSegmentInfos = infos;
128+
// Commit and roll the xlog when we receive a different generation than what was last received.
129+
// lower/higher gens are possible from a new primary that was just elected.
130+
if (incomingGeneration != lastReceivedGen) {
131+
commitSegmentInfos();
129132
translogManager.rollTranslogGeneration();
130133
}
134+
lastReceivedGen = incomingGeneration;
131135
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
132136
}
133137

@@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
141145
*
142146
* @throws IOException - When there is an IO error committing the SegmentInfos.
143147
*/
144-
public void commitSegmentInfos() throws IOException {
145-
// TODO: This method should wait for replication events to finalize.
146-
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
147-
/*
148-
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
149-
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
150-
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
151-
*/
152-
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
153-
latestSegmentInfos.changed();
154-
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
148+
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
149+
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
150+
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
155151
translogManager.syncTranslog();
156152
}
157153

154+
protected void commitSegmentInfos() throws IOException {
155+
commitSegmentInfos(getLatestSegmentInfos());
156+
}
157+
158158
@Override
159159
public String getHistoryUUID() {
160160
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
@@ -405,7 +405,16 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
405405
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
406406
: "Either the write lock must be held or the engine must be currently be failing itself";
407407
try {
408-
IOUtils.close(readerManager, translogManager.getTranslog(), store::decRef);
408+
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
409+
/*
410+
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
411+
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
412+
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
413+
*/
414+
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
415+
latestSegmentInfos.changed();
416+
commitSegmentInfos(latestSegmentInfos);
417+
IOUtils.close(readerManager, translogManager, store::decRef);
409418
} catch (Exception e) {
410419
logger.warn("failed to close engine", e);
411420
} finally {

server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java

+3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
7474
* @throws IOException - When Refresh fails with an IOException.
7575
*/
7676
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
77+
// roll over the currentInfo's generation, this ensures the on-disk gen
78+
// is always increased.
79+
infos.updateGeneration(currentInfos);
7780
currentInfos = infos;
7881
maybeRefresh();
7982
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+4-24
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ public void updateShardState(
620620
if (indexSettings.isSegRepEnabled()) {
621621
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
622622
assert newRouting.primary() && currentRouting.primary() == false;
623-
promoteNRTReplicaToPrimary();
623+
resetEngineToGlobalCheckpoint();
624624
}
625625
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
626626
ensurePeerRecoveryRetentionLeasesExist();
@@ -3571,7 +3571,9 @@ private void innerAcquireReplicaOperationPermit(
35713571
currentGlobalCheckpoint,
35723572
maxSeqNo
35733573
);
3574-
if (currentGlobalCheckpoint < maxSeqNo) {
3574+
// With Segment Replication enabled, we never want to reset a replica's engine unless
3575+
// it is promoted to primary.
3576+
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
35753577
resetEngineToGlobalCheckpoint();
35763578
} else {
35773579
getEngine().rollTranslogGeneration();
@@ -4132,26 +4134,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
41324134
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
41334135
return getEngine().getSegmentInfosSnapshot();
41344136
}
4135-
4136-
/**
4137-
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
4138-
*
4139-
* If this shard is currently using a replication engine, this method:
4140-
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
4141-
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
4142-
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
4143-
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
4144-
* any ack'd writes that were not copied to this replica before promotion.
4145-
*/
4146-
private void promoteNRTReplicaToPrimary() {
4147-
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
4148-
getReplicationEngine().ifPresentOrElse(engine -> {
4149-
try {
4150-
engine.commitSegmentInfos();
4151-
resetEngineToGlobalCheckpoint();
4152-
} catch (IOException e) {
4153-
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
4154-
}
4155-
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
4156-
}
41574137
}

server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
156156
+ temporaryFileName
157157
+ "] in "
158158
+ Arrays.toString(store.directory().listAll());
159-
store.directory().sync(Collections.singleton(temporaryFileName));
159+
// With Segment Replication, we will fsync after a full commit has been received.
160+
if (store.indexSettings().isSegRepEnabled() == false) {
161+
store.directory().sync(Collections.singleton(temporaryFileName));
162+
}
160163
IndexOutput remove = removeOpenIndexOutputs(name);
161164
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
162165
}

0 commit comments

Comments
 (0)