From b59e9deac9c0ef2e570bcfc1364a7e9c95814360 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 4 Sep 2022 17:09:46 -0700 Subject: [PATCH] Segment Replication - Commit SegmentInfos on replicas when new generation received. This change updates NRTReplicationEngine to trigger a commit when a new segment generation is received from the primary. It also updates the engine to commit when a replica is closed so that it can restart from the same segments. Signed-off-by: Marc Handalian --- .../index/engine/NRTReplicationEngine.java | 35 ++-- .../engine/NRTReplicationReaderManager.java | 1 + .../opensearch/index/shard/IndexShard.java | 26 +-- .../indices/recovery/MultiFileWriter.java | 5 +- .../engine/NRTReplicationEngineTests.java | 134 +++++-------- .../SegmentReplicationIndexShardTests.java | 187 +++++++++++++++++- .../index/shard/IndexShardTestCase.java | 10 +- 7 files changed, 273 insertions(+), 125 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index cf753e3360c39..1dba42a5a95c7 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine { private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; + private static final int SI_COUNTER_INCREMENT = 10; public NRTReplicationEngine(EngineConfig engineConfig) { @@ -120,14 +122,16 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. + long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. - if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { - this.lastCommittedSegmentInfos = infos; + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); translogManager.rollTranslogGeneration(); } + lastReceivedGen = incomingGeneration; localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th * * @throws IOException - When there is an IO error committing the SegmentInfos. */ - public void commitSegmentInfos() throws IOException { - // TODO: This method should wait for replication events to finalize. - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - /* - This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied - from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is - used to generate new segment file names. The ideal solution is to identify the counter from previous primary. - */ - latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; - latestSegmentInfos.changed(); - store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + private void commitSegmentInfos(SegmentInfos infos) throws IOException { + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); translogManager.syncTranslog(); } + protected void commitSegmentInfos() throws IOException { + commitSegmentInfos(getLatestSegmentInfos()); + } + @Override public String getHistoryUUID() { return loadHistoryUUID(lastCommittedSegmentInfos.userData); @@ -354,6 +354,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + /* + This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied + from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is + used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + */ + latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; + latestSegmentInfos.changed(); + commitSegmentInfos(latestSegmentInfos); IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 16e615672a26f..d1d5adc51ad0f 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -74,6 +74,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re * @throws IOException - When Refresh fails with an IOException. */ public synchronized void updateSegments(SegmentInfos infos) throws IOException { + infos.updateGeneration(currentInfos); currentInfos = infos; maybeRefresh(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 670af1f1c6fd9..2e20c23e1819a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -623,7 +623,7 @@ public void updateShardState( if (indexSettings.isSegRepEnabled()) { // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. assert newRouting.primary() && currentRouting.primary() == false; - promoteNRTReplicaToPrimary(); + resetEngineToGlobalCheckpoint(); } replicationTracker.activatePrimaryMode(getLocalCheckpoint()); ensurePeerRecoveryRetentionLeasesExist(); @@ -3557,7 +3557,7 @@ private void innerAcquireReplicaOperationPermit( currentGlobalCheckpoint, maxSeqNo ); - if (currentGlobalCheckpoint < maxSeqNo) { + if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) { resetEngineToGlobalCheckpoint(); } else { getEngine().translogManager().rollTranslogGeneration(); @@ -4120,26 +4120,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() { public GatedCloseable getSegmentInfosSnapshot() { return getEngine().getSegmentInfosSnapshot(); } - - /** - * With segment replication enabled - prepare the shard's engine to be promoted as the new primary. - * - * If this shard is currently using a replication engine, this method: - * 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point. - * InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos - * that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion. - * 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be - * any ack'd writes that were not copied to this replica before promotion. - */ - private void promoteNRTReplicaToPrimary() { - assert shardRouting.primary() && indexSettings.isSegRepEnabled(); - getReplicationEngine().ifPresentOrElse(engine -> { - try { - engine.commitSegmentInfos(); - resetEngineToGlobalCheckpoint(); - } catch (IOException e) { - throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e); - } - }, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); }); - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 3509615052707..ec3986017afac 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); - store.directory().sync(Collections.singleton(temporaryFileName)); + // With Segment Replication, we will fsync after a full commit has been received. + if (store.indexSettings().isSegRepEnabled() == false) { + store.directory().sync(Collections.singleton(temporaryFileName)); + } IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 540054782133a..71947d7c69368 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -15,10 +15,8 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; -import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; -import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; @@ -36,17 +34,21 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; public class NRTReplicationEngineTests extends EngineTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() + ); + public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); @@ -70,7 +72,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica( @@ -104,88 +106,63 @@ public void testEngineWritesOpsToTranslog() throws Exception { } } - public void testUpdateSegments() throws Exception { + public void testUpdateSegments_CommitOnGenIncrease() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { - // add docs to the primary engine. - List operations = generateHistoryOnReplica( - between(1, 500), - randomBoolean(), - randomBoolean(), - randomBoolean(), - Engine.Operation.TYPE.INDEX - ); - - for (Engine.Operation op : operations) { - applyOperation(engine, op); - applyOperation(nrtEngine, op); - } - - engine.refresh("test"); - - final SegmentInfos latestPrimaryInfos = engine.getLatestSegmentInfos(); - nrtEngine.updateSegments(latestPrimaryInfos, engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine, latestPrimaryInfos); - - // assert a doc from the operations exists. - final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null); - try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) { - assertThat(getResult.exists(), equalTo(true)); - assertThat(getResult.docIdAndVersion(), notNullValue()); - } - - try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) { - assertThat(getResult.exists(), equalTo(true)); - assertThat(getResult.docIdAndVersion(), notNullValue()); - } + // assume we start at the same gen. + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); + + // flush the primary engine - we don't need any segments, just force a new commit point. + engine.flush(true, true); + assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); + nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); + assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration()); + } + } - // Flush the primary and update the NRTEngine with the latest committed infos. - engine.flush(); - nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint + public void updateSegments_replicaAtLowerCommitGen() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + // commit the infos to push us to segments_3. + nrtEngine.commitSegmentInfos(); - nrtEngine.ensureOpen(); - try ( - Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() - ) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); - assertThat( - TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), - equalTo(seqNos) - ); - } + // update the replica with a lower gen. + assertEquals(2, engine.getLatestSegmentInfos().getGeneration()); + nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + } + } - final SegmentInfos primaryInfos = engine.getLastCommittedSegmentInfos(); - nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos); + public void updateSegments_ReplicaAlreadyAtCommitGen() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - assertEquals( - assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration, - assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getGeneration().translogFileGeneration - ); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + // commit the infos to push us to segments_3. + nrtEngine.commitSegmentInfos(); - try ( - Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() - ) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); - assertThat( - TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), - equalTo(seqNos) - ); - } + // update the replica with a lower gen. + assertEquals(2, engine.getLatestSegmentInfos().getGeneration()); + nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - // Ensure the same hit count between engines. - int expectedDocCount; - try (final Engine.Searcher test = engine.acquireSearcher("test")) { - expectedDocCount = test.count(Queries.newMatchAllQuery()); - assertSearcherHits(nrtEngine, expectedDocCount); - } - assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy()); + nrtEngine.close(); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); } } @@ -227,12 +204,9 @@ public void testCommitSegmentInfos() throws Exception { // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints // stored in user data. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() - ); + try ( - final Store nrtEngineStore = createStore(indexSettings, newDirectory()); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 3af882a8087ec..cf6e76cd220a0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -15,6 +17,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -48,6 +51,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -66,7 +70,7 @@ public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelRepli .build(); /** - * Test that latestReplicationCheckpoint returns null only for docrep enabled indices + * Test that latestReplicationCheckpoint returns null only for docrep enabled indices */ public void testReplicationCheckpointNullForDocRep() throws IOException { Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "DOCUMENT").put(Settings.EMPTY).build(); @@ -76,7 +80,7 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { } /** - * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices + * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegReb() throws IOException { Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); @@ -205,6 +209,170 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { closeShards(shard); } + public void testReplicaReceivesGenIncrease() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + flushShard(primary, true); + replicateSegments(primary, shards.getReplicas()); + + final int totalDocs = numDocs + shards.indexDocs(randomIntBetween(numDocs + 1, numDocs + 10)); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + assertEqualCommittedSegments(primary, replica); + assertDocCount(primary, totalDocs); + assertDocCount(replica, totalDocs); + } + } + + public void testReplicaReceivesLowerGeneration() throws Exception { + // when a replica gets incoming segments that are lower than what it currently has on disk. + + // start 3 nodes Gens: P [2], R [2], R[2] + // index some docs and flush twice, push to only 1 replica. + // State Gens: P [4], R-1 [3], R-2 [2] + // Promote R-2 as the new primary and demote the old primary. + // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. + // index docs on new primary and flush + // replicate to all. + // Expected result: State Gens: P[4], R-1 [4], R-2 [4] + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica_1 = shards.getReplicas().get(0); + final IndexShard replica_2 = shards.getReplicas().get(1); + int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + flushShard(primary, false); + replicateSegments(primary, List.of(replica_1)); + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(primary, false); + assertLatestCommitGen(4, primary); + replicateSegments(primary, List.of(replica_1)); + + assertEqualCommittedSegments(primary, replica_1); + assertLatestCommitGen(4, primary, replica_1); + assertLatestCommitGen(2, replica_2); + + shards.promoteReplicaToPrimary(replica_2).get(); + primary.close("demoted", false); + primary.store().close(); + IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + assertLatestCommitGen(4, oldPrimary); + assertEqualCommittedSegments(oldPrimary, replica_1); + + assertLatestCommitGen(4, replica_2); + + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(replica_2, false); + replicateSegments(replica_2, shards.getReplicas()); + assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); + } + } + + private void assertLatestCommitGen(long expected, IndexShard... shards) throws IOException { + for (IndexShard indexShard : shards) { + try (final GatedCloseable commit = indexShard.acquireLastIndexCommit(false)) { + assertEquals(expected, commit.get().getGeneration()); + } + } + } + + private void assertEqualCommittedSegments(IndexShard primary, IndexShard... replicas) throws IOException { + for (IndexShard replica : replicas) { + final SegmentInfos replicaInfos = replica.store().readLastCommittedSegmentsInfo(); + final SegmentInfos primaryInfos = primary.store().readLastCommittedSegmentsInfo(); + final Map latestReplicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); + final Map latestPrimaryMetadata = primary.store().getSegmentMetadataMap(primaryInfos); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(latestPrimaryMetadata, latestReplicaMetadata); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.missing.isEmpty()); + } + } + + public void testReplicaReceivesLowerSegmentsGen() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("Test"); + replicateSegments(primary, shards.getReplicas()); + + shards.indexDocs(randomInt(10)); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + assertEqualCommittedSegments(primary, replica); + } + } + + public void testReplicaRestarts() throws Exception { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh and copy the segments over. + if (randomBoolean()) { + flushShard(primary); + } + primary.refresh("Test"); + replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(primary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, numDocs); + } + + final int i1 = randomInt(5); + for (int i = 0; i < i1; i++) { + shards.indexDocs(randomInt(10)); + + // randomly resetart a replica + final IndexShard replicaToRestart = getRandomReplica(shards); + replicaToRestart.close("restart", false); + replicaToRestart.store().close(); + shards.removeReplica(replicaToRestart); + final IndexShard newReplica = shards.addReplicaWithExistingPath( + replicaToRestart.shardPath(), + replicaToRestart.routingEntry().currentNodeId() + ); + shards.recoverReplica(newReplica); + + // refresh and push segments to our other replicas. + if (randomBoolean()) { + failAndPromoteRandomReplica(shards); + } + flushShard(shards.getPrimary()); + replicateSegments(shards.getPrimary(), shards.getReplicas()); + } + primary = shards.getPrimary(); + + // refresh and push segments to our other replica. + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterReplication = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterReplication)); + } + } + } + public void testNRTReplicaPromotedAsPrimary() throws Exception { try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { shards.startAll(); @@ -523,4 +691,19 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept assertEquals("Should have resolved listener with failure", 0, latch.getCount()); assertNull(targetService.get(target.getId())); } + + private IndexShard getRandomReplica(ReplicationGroup shards) { + return shards.getReplicas().get(randomInt(shards.getReplicas().size() - 1)); + } + + private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws IOException { + IndexShard primary = shards.getPrimary(); + final IndexShard newPrimary = getRandomReplica(shards); + shards.promoteReplicaToPrimary(newPrimary); + primary.close("demoted", true); + primary.store().close(); + primary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); + shards.recoverReplica(primary); + return newPrimary; + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 3302ffd810bd4..a331f09b99021 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -139,6 +139,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -1251,10 +1252,10 @@ public final List replicateSegments( List replicaShards ) throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); - Store.MetadataSnapshot primaryMetadata; + Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); - primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { @@ -1266,12 +1267,11 @@ public final List replicateSegments( public void onReplicationDone(SegmentReplicationState state) { try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { final SegmentInfos replicaInfos = snapshot.get(); - final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); - final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + final Map replicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } finally {