Skip to content

Commit

Permalink
Segment Replication - Commit SegmentInfos on replicas when new genera…
Browse files Browse the repository at this point in the history
…tion received.

This change updates NRTReplicationEngine to trigger a commit when a new segment generation is received from
the primary.  It also updates the engine to commit when a replica is closed so that it can restart from the same
segments.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Sep 5, 2022
1 parent a0ab4d7 commit b3f37e2
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine {
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -120,14 +122,16 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand All @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}

protected void commitSegmentInfos() throws IOException {
commitSegmentInfos(getLatestSegmentInfos());
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down Expand Up @@ -354,6 +354,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
Expand Down Expand Up @@ -3557,7 +3557,9 @@ private void innerAcquireReplicaOperationPermit(
currentGlobalCheckpoint,
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
// With Segment Replication enabled, we never want to reset a replica's engine unless
// it is promoted to primary.
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().translogManager().rollTranslogGeneration();
Expand Down Expand Up @@ -4120,26 +4122,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// With Segment Replication, we will fsync after a full commit has been received.
if (store.indexSettings().isSegRepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.replication.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
Expand Down Expand Up @@ -41,7 +39,6 @@ public class CopyState extends AbstractRefCounted {
private final byte[] infosBytes;
private GatedCloseable<IndexCommit> commitRef;
private final IndexShard shard;
public static final Logger logger = LogManager.getLogger(CopyState.class);

public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException {
super("CopyState-" + shard.shardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
Expand All @@ -36,17 +34,21 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

public class NRTReplicationEngineTests extends EngineTestCase {

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
);

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore();
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
Expand All @@ -70,7 +72,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore();
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
Expand Down Expand Up @@ -104,88 +106,63 @@ public void testEngineWritesOpsToTranslog() throws Exception {
}
}

public void testUpdateSegments() throws Exception {
public void testUpdateSegments_CommitOnGenIncrease() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore();
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
// add docs to the primary engine.
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
randomBoolean(),
randomBoolean(),
randomBoolean(),
Engine.Operation.TYPE.INDEX
);

for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
}

engine.refresh("test");

final SegmentInfos latestPrimaryInfos = engine.getLatestSegmentInfos();
nrtEngine.updateSegments(latestPrimaryInfos, engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine, latestPrimaryInfos);

// assert a doc from the operations exists.
final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null);
try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}

try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());

// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

// Flush the primary and update the NRTEngine with the latest committed infos.
engine.flush();
nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint
public void updateSegments_replicaAtLowerCommitGen() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
// commit the infos to push us to segments_3.
nrtEngine.commitSegmentInfos();

nrtEngine.ensureOpen();
try (
Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot()
) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
// update the replica with a lower gen.
assertEquals(2, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

final SegmentInfos primaryInfos = engine.getLastCommittedSegmentInfos();
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos);
public void updateSegments_ReplicaAlreadyAtCommitGen() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

assertEquals(
assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration,
assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getGeneration().translogFileGeneration
);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
// commit the infos to push us to segments_3.
nrtEngine.commitSegmentInfos();

try (
Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot()
) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
// update the replica with a lower gen.
assertEquals(2, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());

// Ensure the same hit count between engines.
int expectedDocCount;
try (final Engine.Searcher test = engine.acquireSearcher("test")) {
expectedDocCount = test.count(Queries.newMatchAllQuery());
assertSearcherHits(nrtEngine, expectedDocCount);
}
assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy());
nrtEngine.close();
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
}
}

Expand Down Expand Up @@ -227,12 +204,9 @@ public void testCommitSegmentInfos() throws Exception {
// This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints
// stored in user data.
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
);

try (
final Store nrtEngineStore = createStore(indexSettings, newDirectory());
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
Expand Down
Loading

0 comments on commit b3f37e2

Please sign in to comment.