Skip to content

Commit f43fc64

Browse files
committed
address review comments
Signed-off-by: Poojita Raj <[email protected]>
1 parent 262949b commit f43fc64

File tree

8 files changed

+42
-54
lines changed

8 files changed

+42
-54
lines changed

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
5656
private final CompletionStatsCache completionStatsCache;
5757
private final LocalCheckpointTracker localCheckpointTracker;
5858
private final WriteOnlyTranslogManager translogManager;
59+
private final boolean shouldCommit;
5960

6061
private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
6162

@@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
113114
engineConfig.getPrimaryModeSupplier()
114115
);
115116
this.translogManager = translogManagerRef;
117+
this.shouldCommit = !engineConfig.getIndexSettings().isRemoteStoreEnabled();
116118
} catch (IOException e) {
117119
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
118120
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -138,16 +140,16 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
138140
ensureOpen();
139141
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
140142
final long incomingGeneration = infos.getGeneration();
141-
boolean remoteStoreEnabled = engineConfig.getIndexSettings().isRemoteStoreEnabled();
142-
readerManager.updateSegments(infos, remoteStoreEnabled);
143+
readerManager.updateSegments(infos);
143144

144145
// Commit and roll the translog when we receive a different generation than what was last received.
145146
// lower/higher gens are possible from a new primary that was just elected.
146147
if (incomingGeneration != lastReceivedGen) {
147-
if (remoteStoreEnabled == false) {
148+
if (shouldCommit) {
148149
commitSegmentInfos();
149150
} else {
150151
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
152+
translogManager.syncTranslog();
151153
}
152154
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
153155
translogManager.rollTranslogGeneration();
@@ -389,7 +391,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
389391
: "Either the write lock must be held or the engine must be currently be failing itself";
390392
try {
391393
// if remote store is enabled, all segments durably persisted
392-
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
394+
if (shouldCommit) {
393395
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
394396
/*
395397
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied

server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
9696
* Update this reader's segments and refresh.
9797
*
9898
* @param infos {@link SegmentInfos} infos
99-
* @param remoteStoreEnabled true if remote store is enabled
10099
* @throws IOException - When Refresh fails with an IOException.
101100
*/
102-
public synchronized void updateSegments(SegmentInfos infos, boolean remoteStoreEnabled) throws IOException {
101+
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
103102
// roll over the currentInfo's generation, this ensures the on-disk gen
104-
// is always increased (in the case remote store is not enabled)
105-
if (remoteStoreEnabled == false) {
106-
infos.updateGeneration(currentInfos);
107-
}
103+
// is always increased.
104+
infos.updateGeneration(currentInfos);
108105
currentInfos = infos;
109106
maybeRefresh();
110107
}

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
245245
indexShard.prepareForIndexRecovery();
246246
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
247247
if (hasRemoteSegmentStore) {
248-
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false);
248+
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
249249
}
250250
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
251251
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();

server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testCreateEngine() throws IOException {
6161
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
6262
try (
6363
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
64-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
64+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
6565
) {
6666
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
6767
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
@@ -85,7 +85,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {
8585

8686
try (
8787
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
88-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
88+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
8989
) {
9090
List<Engine.Operation> operations = generateHistoryOnReplica(
9191
between(1, 500),
@@ -126,7 +126,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
126126

127127
try (
128128
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
129-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
129+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
130130
) {
131131
// assume we start at the same gen.
132132
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
@@ -161,7 +161,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnable
161161
// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
162162
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
163163
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
164-
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
164+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
165165
}
166166
}
167167

@@ -171,7 +171,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
171171

172172
try (
173173
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
174-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
174+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
175175
) {
176176
nrtEngine.getLatestSegmentInfos().changed();
177177
nrtEngine.getLatestSegmentInfos().changed();
@@ -198,7 +198,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt
198198
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
199199
try (
200200
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
201-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
201+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
202202
) {
203203
CountDownLatch latch = new CountDownLatch(1);
204204
Thread commitThread = new Thread(() -> {
@@ -231,7 +231,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
231231

232232
try (
233233
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
234-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
234+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
235235
) {
236236
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
237237
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
@@ -255,7 +255,7 @@ public void testRefreshOnNRTEngine() throws IOException {
255255

256256
try (
257257
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
258-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
258+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
259259
) {
260260
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
261261
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
@@ -277,7 +277,7 @@ public void testTrimTranslogOps() throws Exception {
277277

278278
try (
279279
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
280-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings);
280+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
281281
) {
282282
List<Engine.Operation> operations = generateHistoryOnReplica(
283283
between(1, 100),
@@ -313,7 +313,7 @@ public void testCommitSegmentInfos() throws Exception {
313313

314314
try (
315315
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
316-
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
316+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
317317
) {
318318
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
319319
.stream()
@@ -365,4 +365,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
365365
}
366366
return new NRTReplicationEngine(replicaConfig);
367367
}
368+
369+
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
370+
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
371+
}
368372
}

server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
package org.opensearch.index.shard;
1010

11-
import org.apache.lucene.store.Directory;
12-
import org.apache.lucene.store.FilterDirectory;
13-
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
1411
import org.junit.Assert;
1512
import org.opensearch.cluster.metadata.IndexMetadata;
1613
import org.opensearch.cluster.routing.RecoverySource;
@@ -61,8 +58,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
6158
final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata())
6259
.primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1)
6360
.build();
64-
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
65-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
61+
syncDirectory(replica);
6662
closeShards(replica);
6763
shards.removeReplica(replica);
6864

@@ -114,11 +110,8 @@ public IndexShard indexShard() {
114110
replicateSegments(primary, shards.getReplicas());
115111
shards.assertAllEqual(numDocs + moreDocs);
116112

117-
storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
118-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
119-
120-
storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate();
121-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
113+
syncDirectory(primary);
114+
syncDirectory(newReplicaShard);
122115
}
123116
}
124117

@@ -151,11 +144,9 @@ public void testNoTranslogHistoryTransferred() throws Exception {
151144
replicateSegments(primary, shards.getReplicas());
152145
shards.assertAllEqual(numDocs + moreDocs);
153146

154-
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
155-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
156-
157-
storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
158-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
147+
syncDirectory(primary);
148+
syncDirectory(replica);
159149
}
160150
}
151+
161152
}

server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
package org.opensearch.index.shard;
1010

11-
import org.apache.lucene.store.Directory;
12-
import org.apache.lucene.store.FilterDirectory;
13-
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
1411
import org.opensearch.cluster.metadata.IndexMetadata;
1512
import org.opensearch.common.settings.Settings;
1613
import org.opensearch.index.engine.NRTReplicationEngineFactory;
@@ -42,10 +39,8 @@ public void testReplicaSyncingFromRemoteStore() throws IOException {
4239
replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
4340
assertDocs(replicaShard, "1", "2");
4441

45-
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate();
46-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
47-
storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate();
48-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
42+
syncDirectory(primaryShard);
43+
syncDirectory(replicaShard);
4944
closeShards(primaryShard, replicaShard);
5045
}
5146
}

server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
package org.opensearch.indices.recovery;
1010

11-
import org.apache.lucene.store.Directory;
12-
import org.apache.lucene.store.FilterDirectory;
13-
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
1411
import org.opensearch.cluster.metadata.IndexMetadata;
1512
import org.opensearch.common.settings.Settings;
1613
import org.opensearch.index.IndexSettings;
@@ -67,14 +64,9 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
6764
assertEquals(1, primary.getRetentionLeases().leases().size());
6865
assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry())));
6966

70-
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
71-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
72-
73-
storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate();
74-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
75-
76-
storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate();
77-
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
67+
syncDirectory(primary);
68+
syncDirectory(replica1);
69+
syncDirectory(replica2);
7870
}
7971
}
8072
}

test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,4 +1204,11 @@ protected void performOnReplica(RetentionLeaseSyncAction.Request request, IndexS
12041204
}
12051205
}
12061206

1207+
/**
1208+
* Syncs all files in directory of shard
1209+
*/
1210+
protected void syncDirectory(IndexShard shard) throws IOException {
1211+
shard.store().directory().sync(List.of(shard.store().directory().listAll()));
1212+
}
1213+
12071214
}

0 commit comments

Comments
 (0)