@@ -46,11 +46,22 @@ public class NRTReplicationEngineTests extends EngineTestCase {
4646 Settings .builder ().put (IndexMetadata .SETTING_REPLICATION_TYPE , ReplicationType .SEGMENT ).build ()
4747 );
4848
49+ private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule .newIndexSettings (
50+ "index" ,
51+ Settings .builder ()
52+ .put (IndexMetadata .SETTING_REPLICATION_TYPE , ReplicationType .SEGMENT )
53+ .put (IndexMetadata .SETTING_REMOTE_STORE_ENABLED , "true" )
54+ .put (IndexMetadata .SETTING_REMOTE_TRANSLOG_STORE_ENABLED , "true" )
55+ .put (IndexMetadata .SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY , "translog-repo" )
56+ .put (IndexMetadata .SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL , "100ms" )
57+ .build ()
58+ );
59+
4960 public void testCreateEngine () throws IOException {
5061 final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
5162 try (
5263 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
53- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
64+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
5465 ) {
5566 final SegmentInfos latestSegmentInfos = nrtEngine .getLatestSegmentInfos ();
5667 final SegmentInfos lastCommittedSegmentInfos = nrtEngine .getLastCommittedSegmentInfos ();
@@ -74,7 +85,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {
7485
7586 try (
7687 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
77- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
88+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
7889 ) {
7990 List <Engine .Operation > operations = generateHistoryOnReplica (
8091 between (1 , 500 ),
@@ -115,7 +126,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
115126
116127 try (
117128 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
118- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
129+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
119130 ) {
120131 // assume we start at the same gen.
121132 assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
@@ -131,13 +142,36 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
131142 }
132143 }
133144
145+ public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled () throws IOException {
146+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
147+
148+ try (
149+ final Store nrtEngineStore = createStore (REMOTE_STORE_INDEX_SETTINGS , newDirectory ());
150+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , REMOTE_STORE_INDEX_SETTINGS )
151+ ) {
152+ // assume we start at the same gen.
153+ assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
154+ assertEquals (nrtEngine .getLatestSegmentInfos ().getGeneration (), nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
155+ assertEquals (engine .getLatestSegmentInfos ().getGeneration (), nrtEngine .getLatestSegmentInfos ().getGeneration ());
156+
157+ // flush the primary engine - we don't need any segments, just force a new commit point.
158+ engine .flush (true , true );
159+ assertEquals (3 , engine .getLatestSegmentInfos ().getGeneration ());
160+
161+ // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
162+ nrtEngine .updateSegments (engine .getLatestSegmentInfos ());
163+ assertEquals (2 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
164+ assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
165+ }
166+ }
167+
134168 public void testUpdateSegments_replicaReceivesSISWithLowerGen () throws IOException {
135169 // if the replica is already at segments_N that is received, it will commit segments_N+1.
136170 final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
137171
138172 try (
139173 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
140- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
174+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
141175 ) {
142176 nrtEngine .getLatestSegmentInfos ().changed ();
143177 nrtEngine .getLatestSegmentInfos ().changed ();
@@ -160,12 +194,42 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
160194 }
161195 }
162196
197+ public void testUpdateSegments_replicaReceivesSISWithLowerGen_remoteStoreEnabled () throws IOException {
198+ // if the replica is already at segments_N that is received, it will commit segments_N+1.
199+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
200+
201+ try (
202+ final Store nrtEngineStore = createStore (REMOTE_STORE_INDEX_SETTINGS , newDirectory ());
203+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , REMOTE_STORE_INDEX_SETTINGS )
204+ ) {
205+ nrtEngine .getLatestSegmentInfos ().changed ();
206+ nrtEngine .getLatestSegmentInfos ().changed ();
207+ // commit the infos to push us to segments_3.
208+ nrtEngine .commitSegmentInfos ();
209+ assertEquals (3 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
210+ assertEquals (3 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
211+
212+ // update the replica with segments_2 from the primary.
213+ final SegmentInfos primaryInfos = engine .getLatestSegmentInfos ();
214+ assertEquals (2 , primaryInfos .getGeneration ());
215+
216+ nrtEngine .updateSegments (primaryInfos );
217+ assertEquals (3 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
218+ assertEquals (3 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
219+ assertEquals (primaryInfos .getVersion (), nrtEngine .getLatestSegmentInfos ().getVersion ());
220+ assert (primaryInfos .getVersion () < nrtEngine .getLastCommittedSegmentInfos ().getVersion ());
221+
222+ nrtEngine .close ();
223+ assertEquals (3 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
224+ }
225+ }
226+
163227 public void testUpdateSegments_replicaCommitsFirstReceivedInfos () throws IOException {
164228 final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
165229
166230 try (
167231 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
168- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
232+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
169233 ) {
170234 assertEquals (2 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
171235 assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
@@ -184,12 +248,38 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
184248 }
185249 }
186250
251+ public void testUpdateSegments_replicaCommitsFirstReceivedInfos_remoteStoreEnabled () throws IOException {
252+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
253+
254+ try (
255+ final Store nrtEngineStore = createStore (REMOTE_STORE_INDEX_SETTINGS , newDirectory ());
256+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , REMOTE_STORE_INDEX_SETTINGS )
257+ ) {
258+ assertEquals (2 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
259+ assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
260+ // bump the latest infos version a couple of times so that we can assert the correct version after commit.
261+ engine .getLatestSegmentInfos ().changed ();
262+ engine .getLatestSegmentInfos ().changed ();
263+ assertNotEquals (nrtEngine .getLatestSegmentInfos ().getVersion (), engine .getLatestSegmentInfos ().getVersion ());
264+
265+ // update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed.
266+ final SegmentInfos primaryInfos = engine .getLatestSegmentInfos ();
267+ assertEquals (2 , primaryInfos .getGeneration ());
268+ nrtEngine .updateSegments (primaryInfos );
269+
270+ // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
271+ final SegmentInfos lastCommittedSegmentInfos = nrtEngine .getLastCommittedSegmentInfos ();
272+ assertEquals (primaryInfos .getVersion (), nrtEngine .getLatestSegmentInfos ().getVersion ());
273+ assert (primaryInfos .getVersion () > lastCommittedSegmentInfos .getVersion ());
274+ }
275+ }
276+
187277 public void testRefreshOnNRTEngine () throws IOException {
188278 final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .NO_OPS_PERFORMED );
189279
190280 try (
191281 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
192- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
282+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
193283 ) {
194284 assertEquals (2 , nrtEngine .getLastCommittedSegmentInfos ().getGeneration ());
195285 assertEquals (2 , nrtEngine .getLatestSegmentInfos ().getGeneration ());
@@ -211,7 +301,7 @@ public void testTrimTranslogOps() throws Exception {
211301
212302 try (
213303 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
214- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore );
304+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings );
215305 ) {
216306 List <Engine .Operation > operations = generateHistoryOnReplica (
217307 between (1 , 100 ),
@@ -247,7 +337,7 @@ public void testCommitSegmentInfos() throws Exception {
247337
248338 try (
249339 final Store nrtEngineStore = createStore (INDEX_SETTINGS , newDirectory ());
250- final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore )
340+ final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine (globalCheckpoint , nrtEngineStore , defaultSettings )
251341 ) {
252342 List <Engine .Operation > operations = generateHistoryOnReplica (between (1 , 500 ), randomBoolean (), randomBoolean (), randomBoolean ())
253343 .stream ()
@@ -282,18 +372,11 @@ public void testCommitSegmentInfos() throws Exception {
282372 }
283373 }
284374
285- private NRTReplicationEngine buildNrtReplicaEngine (AtomicLong globalCheckpoint , Store store ) throws IOException {
375+ private NRTReplicationEngine buildNrtReplicaEngine (AtomicLong globalCheckpoint , Store store , IndexSettings settings )
376+ throws IOException {
286377 Lucene .cleanLuceneIndex (store .directory ());
287378 final Path translogDir = createTempDir ();
288- final EngineConfig replicaConfig = config (
289- defaultSettings ,
290- store ,
291- translogDir ,
292- NoMergePolicy .INSTANCE ,
293- null ,
294- null ,
295- globalCheckpoint ::get
296- );
379+ final EngineConfig replicaConfig = config (settings , store , translogDir , NoMergePolicy .INSTANCE , null , null , globalCheckpoint ::get );
297380 if (Lucene .indexExists (store .directory ()) == false ) {
298381 store .createEmpty (replicaConfig .getIndexSettings ().getIndexVersionCreated ().luceneVersion );
299382 final String translogUuid = Translog .createEmptyTranslog (
0 commit comments