diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fdc9aaecdb45..3d8cb2be65a7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fixed Staggered merge - load average replace with AverageTrackers, some Default thresholds modified ([#18666](https://github.com/opensearch-project/OpenSearch/pull/18666)) - Use `new SecureRandom()` to avoid blocking ([18729](https://github.com/opensearch-project/OpenSearch/issues/18729)) - Use ScoreDoc instead of FieldDoc when creating TopScoreDocCollectorManager to avoid unnecessary conversion ([#18802](https://github.com/opensearch-project/OpenSearch/pull/18802)) +- Fix leafSorter optimization for ReadOnlyEngine and NRTReplicationEngine ([#18639](https://github.com/opensearch-project/OpenSearch/pull/18639)) ### Security diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 12c0fea42bb2f..cb4e4122702b9 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -150,7 +150,8 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), replicaFileTracker::incRef, - replicaFileTracker::decRef + replicaFileTracker::decRef, + engineConfig ); } @@ -537,6 +538,9 @@ protected LocalCheckpointTracker getLocalCheckpointTracker() { private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. - return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); + return new SoftDeletesDirectoryReaderWrapper( + DirectoryReader.open(store.directory(), engineConfig.getLeafSorter()), + Lucene.SOFT_DELETES_FIELD + ); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 7b4c93c7235fe..47dacc321a3f7 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -39,6 +39,7 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { private volatile SegmentInfos currentInfos; private Consumer> onReaderClosed; private Consumer> onNewReader; + private final EngineConfig engineConfig; /** * Creates and returns a new SegmentReplicationReaderManager from the given @@ -48,16 +49,19 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { * @param reader - The SegmentReplicationReaderManager to use for future reopens. * @param onNewReader - Called when a new reader is created. * @param onReaderClosed - Called when a reader is closed. + * @param engineConfig - The engine configuration containing leafSorter. */ NRTReplicationReaderManager( OpenSearchDirectoryReader reader, Consumer> onNewReader, - Consumer> onReaderClosed + Consumer> onReaderClosed, + EngineConfig engineConfig ) { super(reader); currentInfos = unwrapStandardReader(reader).getSegmentInfos(); this.onNewReader = onNewReader; this.onReaderClosed = onReaderClosed; + this.engineConfig = engineConfig; } @Override @@ -75,7 +79,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // Segment_n here is ignored because it is either already committed on disk as part of previous commit point or // does not yet exist on store (not yet committed) final Collection files = currentInfos.files(false); - DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); + DirectoryReader innerReader = StandardDirectoryReader.open( + referenceToRefresh.directory(), + currentInfos, + subs, + engineConfig.getLeafSorter() + ); final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( innerReader, Lucene.SOFT_DELETES_FIELD diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index ac8e123e49204..4a1d5efbdb1bb 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -78,7 +78,7 @@ public NoOpEngine(EngineConfig config) { super(config, null, null, true, Function.identity(), true); this.segmentsStats = new SegmentsStats(); Directory directory = store.directory(); - try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings().isSoftDeleteEnabled())) { + try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings().isSoftDeleteEnabled(), config.getLeafSorter())) { for (LeafReaderContext ctx : reader.getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); fillSegmentStats(segmentReader, true, segmentsStats); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 4e87ffd6adb1e..174f72f5ace9e 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; @@ -61,6 +62,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; @@ -505,6 +507,17 @@ protected static DirectoryReader openDirectory(Directory directory, boolean wrap } } + protected static DirectoryReader openDirectory(Directory directory, boolean wrapSoftDeletes, Comparator leafSorter) + throws IOException { + assert Transports.assertNotTransportThread("opening directory reader of a read-only engine"); + final DirectoryReader reader = DirectoryReader.open(directory, leafSorter); + if (wrapSoftDeletes) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } else { + return reader; + } + } + @Override public CompletionStats completionStats(String... fieldNamePatterns) { return completionStatsCache.get(fieldNamePatterns); diff --git a/server/src/test/java/org/opensearch/index/engine/LeafSorterOptimizationTests.java b/server/src/test/java/org/opensearch/index/engine/LeafSorterOptimizationTests.java new file mode 100644 index 0000000000000..af0d74ae06e98 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/LeafSorterOptimizationTests.java @@ -0,0 +1,329 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.opensearch.Version; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.BigArrays; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.index.VersionType; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogConfig; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Comparator; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class LeafSorterOptimizationTests extends EngineTestCase { + + public void testReadOnlyEngineUsesLeafSorter() throws IOException { + Path translogPath = createTempDir(); + try (Store store = createStore()) { + store.createEmpty(Version.CURRENT.luceneVersion); + final String translogUUID = Translog.createEmptyTranslog( + translogPath, + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + store.associateIndexWithNewTranslog(translogUUID); + Comparator leafSorter = Comparator.comparingInt(LeafReader::maxDoc); + EngineConfig config = new EngineConfig.Builder().shardId(shardId) + .threadPool(threadPool) + .indexSettings(defaultSettings) + .warmer(null) + .store(store) + .mergePolicy(newMergePolicy()) + .analyzer(newIndexWriterConfig().getAnalyzer()) + .similarity(newIndexWriterConfig().getSimilarity()) + .codecService(new CodecService(null, defaultSettings, logger)) + .eventListener(new Engine.EventListener() { + }) + .translogConfig(new TranslogConfig(shardId, translogPath, defaultSettings, BigArrays.NON_RECYCLING_INSTANCE, "", false)) + .flushMergesAfter(TimeValue.timeValueMinutes(5)) + .retentionLeasesSupplier(() -> RetentionLeases.EMPTY) + .primaryTermSupplier(primaryTerm) + .tombstoneDocSupplier(tombstoneDocSupplier()) + .externalRefreshListener(java.util.Collections.emptyList()) + .internalRefreshListener(java.util.Collections.emptyList()) + .queryCache(IndexSearcher.getDefaultQueryCache()) + .queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy()) + .globalCheckpointSupplier(() -> SequenceNumbers.NO_OPS_PERFORMED) + .leafSorter(leafSorter) + .build(); + long maxSeqNo; + // Index docs with InternalEngine, then open ReadOnlyEngine + try (InternalEngine engine = new InternalEngine(config)) { + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); + for (int i = 0; i < 10; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index( + new Engine.Index( + newUid(doc), + doc, + SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm.get(), + Versions.MATCH_DELETED, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + -1, + false, + SequenceNumbers.UNASSIGNED_SEQ_NO, + 0 + ) + ); + if ((i + 1) % 2 == 0) { + engine.flush(); + } + } + engine.refresh("test"); + engine.flush(); + maxSeqNo = engine.getSeqNoStats(-1).getMaxSeqNo(); + } + // Now open ReadOnlyEngine and check leaf order + EngineConfig readOnlyConfig = new EngineConfig.Builder().shardId(shardId) + .threadPool(threadPool) + .indexSettings(defaultSettings) + .warmer(null) + .store(store) + .mergePolicy(newMergePolicy()) + .analyzer(newIndexWriterConfig().getAnalyzer()) + .similarity(newIndexWriterConfig().getSimilarity()) + .codecService(new CodecService(null, defaultSettings, logger)) + .eventListener(new Engine.EventListener() { + }) + .translogConfig(new TranslogConfig(shardId, translogPath, defaultSettings, BigArrays.NON_RECYCLING_INSTANCE, "", false)) + .flushMergesAfter(TimeValue.timeValueMinutes(5)) + .retentionLeasesSupplier(() -> RetentionLeases.EMPTY) + .primaryTermSupplier(primaryTerm) + .tombstoneDocSupplier(tombstoneDocSupplier()) + .externalRefreshListener(java.util.Collections.emptyList()) + .internalRefreshListener(java.util.Collections.emptyList()) + .queryCache(IndexSearcher.getDefaultQueryCache()) + .queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy()) + .globalCheckpointSupplier(() -> maxSeqNo) + .leafSorter(leafSorter) + .build(); + try ( + ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine( + readOnlyConfig, + null, + null, + true, + java.util.function.Function.identity(), + true + ) + ) { + try (Engine.Searcher searcher = readOnlyEngine.acquireSearcher("test")) { + DirectoryReader reader = (DirectoryReader) searcher.getDirectoryReader(); + assertThat("Should have multiple leaves", reader.leaves().size(), greaterThan(0)); + java.util.List actualOrder = new java.util.ArrayList<>(); + for (org.apache.lucene.index.LeafReaderContext ctx : reader.leaves()) { + actualOrder.add(ctx.reader().maxDoc()); + } + java.util.List expectedOrder = new java.util.ArrayList<>(actualOrder); + expectedOrder.sort(Integer::compareTo); + assertEquals("Leaves should be sorted by maxDoc ascending", expectedOrder, actualOrder); + } + } + } + } + + public void testInternalEngineUsesLeafSorter() throws IOException { + Path translogPath = createTempDir(); + try (Store store = createStore()) { + store.createEmpty(Version.CURRENT.luceneVersion); + final String translogUUID = Translog.createEmptyTranslog( + translogPath, + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + store.associateIndexWithNewTranslog(translogUUID); + Comparator leafSorter = Comparator.comparingInt(LeafReader::maxDoc).reversed(); + EngineConfig config = new EngineConfig.Builder().shardId(shardId) + .threadPool(threadPool) + .indexSettings(defaultSettings) + .warmer(null) + .store(store) + .mergePolicy(newMergePolicy()) + .analyzer(newIndexWriterConfig().getAnalyzer()) + .similarity(newIndexWriterConfig().getSimilarity()) + .codecService(new CodecService(null, defaultSettings, logger)) + .eventListener(new Engine.EventListener() { + }) + .translogConfig(new TranslogConfig(shardId, translogPath, defaultSettings, BigArrays.NON_RECYCLING_INSTANCE, "", false)) + .flushMergesAfter(TimeValue.timeValueMinutes(5)) + .retentionLeasesSupplier(() -> RetentionLeases.EMPTY) + .primaryTermSupplier(primaryTerm) + .tombstoneDocSupplier(tombstoneDocSupplier()) + .externalRefreshListener(java.util.Collections.emptyList()) + .internalRefreshListener(java.util.Collections.emptyList()) + .queryCache(IndexSearcher.getDefaultQueryCache()) + .queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy()) + .globalCheckpointSupplier(() -> SequenceNumbers.NO_OPS_PERFORMED) + .leafSorter(leafSorter) + .build(); + try (InternalEngine engine = new InternalEngine(config)) { + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); + for (int i = 0; i < 20; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index( + new Engine.Index( + newUid(doc), + doc, + SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm.get(), + Versions.MATCH_DELETED, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + -1, + false, + SequenceNumbers.UNASSIGNED_SEQ_NO, + 0 + ) + ); + if ((i + 1) % 5 == 0) { + engine.flush(); + } + } + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + DirectoryReader reader = (DirectoryReader) searcher.getDirectoryReader(); + assertThat("Should have multiple leaves", reader.leaves().size(), greaterThan(0)); + java.util.List actualOrder = new java.util.ArrayList<>(); + for (org.apache.lucene.index.LeafReaderContext ctx : reader.leaves()) { + actualOrder.add(ctx.reader().maxDoc()); + } + java.util.List expectedOrder = new java.util.ArrayList<>(actualOrder); + expectedOrder.sort((a, b) -> Integer.compare(b, a)); + assertEquals("Leaves should be sorted by maxDoc descending", expectedOrder, actualOrder); + } + } + } + } + + public void testTimestampSortOptimizationWorksOnAllEngineTypes() throws IOException { + // Simplified: Only test that InternalEngine respects the leafSorter logic + Path translogPath = createTempDir(); + try (Store store = createStore()) { + store.createEmpty(Version.CURRENT.luceneVersion); + final String translogUUID = Translog.createEmptyTranslog( + translogPath, + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + store.associateIndexWithNewTranslog(translogUUID); + Comparator leafSorter = Comparator.comparingInt(LeafReader::maxDoc).reversed(); + EngineConfig config = new EngineConfig.Builder().shardId(shardId) + .threadPool(threadPool) + .indexSettings(defaultSettings) + .warmer(null) + .store(store) + .mergePolicy(newMergePolicy()) + .analyzer(newIndexWriterConfig().getAnalyzer()) + .similarity(newIndexWriterConfig().getSimilarity()) + .codecService(new CodecService(null, defaultSettings, logger)) + .eventListener(new Engine.EventListener() { + }) + .translogConfig(new TranslogConfig(shardId, translogPath, defaultSettings, BigArrays.NON_RECYCLING_INSTANCE, "", false)) + .flushMergesAfter(TimeValue.timeValueMinutes(5)) + .retentionLeasesSupplier(() -> RetentionLeases.EMPTY) + .primaryTermSupplier(primaryTerm) + .tombstoneDocSupplier(tombstoneDocSupplier()) + .externalRefreshListener(java.util.Collections.emptyList()) + .internalRefreshListener(java.util.Collections.emptyList()) + .queryCache(IndexSearcher.getDefaultQueryCache()) + .queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy()) + .globalCheckpointSupplier(() -> SequenceNumbers.NO_OPS_PERFORMED) + .leafSorter(leafSorter) + .build(); + try (InternalEngine engine = new InternalEngine(config)) { + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); + for (int i = 0; i < 20; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index( + new Engine.Index( + newUid(doc), + doc, + SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm.get(), + Versions.MATCH_DELETED, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + -1, + false, + SequenceNumbers.UNASSIGNED_SEQ_NO, + 0 + ) + ); + if ((i + 1) % 5 == 0) { + engine.flush(); + } + } + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + DirectoryReader reader = (DirectoryReader) searcher.getDirectoryReader(); + assertThat("Should have multiple leaves", reader.leaves().size(), greaterThan(0)); + java.util.List actualOrder = new java.util.ArrayList<>(); + for (org.apache.lucene.index.LeafReaderContext ctx : reader.leaves()) { + actualOrder.add(ctx.reader().maxDoc()); + } + java.util.List expectedOrder = new java.util.ArrayList<>(actualOrder); + expectedOrder.sort((a, b) -> Integer.compare(b, a)); + assertEquals("Leaves should be sorted by maxDoc descending", expectedOrder, actualOrder); + } + } + } + } + + private void testSortPerformance(Engine engine, String engineType) throws IOException { + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) { + DirectoryReader reader = searcher.getDirectoryReader(); + IndexSearcher indexSearcher = new IndexSearcher(reader); + + // Create a sort by timestamp (descending) + Sort timestampSort = new Sort(new SortField("@timestamp", SortField.Type.LONG, true)); + + // Perform a sorted search + TopDocs topDocs = indexSearcher.search(new MatchAllDocsQuery(), 10, timestampSort); + + // Verify that the search completed successfully + assertThat("Search should complete successfully on " + engineType, topDocs.totalHits.value(), greaterThan(0L)); + + // Verify that the engine has leafSorter configured + assertThat("Engine " + engineType + " should have leafSorter configured", engine.config().getLeafSorter(), notNullValue()); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java index d635b38e811c4..2ea9ffde9e00d 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java @@ -11,12 +11,22 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.util.Version; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.BigArrays; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.TranslogConfig; import java.io.IOException; +import static java.util.Collections.emptyList; + public class NRTReplicationReaderManagerTests extends EngineTestCase { public void testCreateNRTreaderManager() throws IOException { @@ -24,10 +34,38 @@ public void testCreateNRTreaderManager() throws IOException { store.createEmpty(Version.LATEST); final DirectoryReader reader = DirectoryReader.open(store.directory()); final SegmentInfos initialInfos = ((StandardDirectoryReader) reader).getSegmentInfos(); + + // Create a minimal engine config for testing + EngineConfig testConfig = new EngineConfig.Builder().shardId(shardId) + .threadPool(threadPool) + .indexSettings(defaultSettings) + .warmer(null) + .store(store) + .mergePolicy(newMergePolicy()) + .analyzer(newIndexWriterConfig().getAnalyzer()) + .similarity(newIndexWriterConfig().getSimilarity()) + .codecService(new CodecService(null, defaultSettings, logger)) + .eventListener(new Engine.EventListener() { + }) + .queryCache(IndexSearcher.getDefaultQueryCache()) + .queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy()) + .translogConfig(new TranslogConfig(shardId, createTempDir(), defaultSettings, BigArrays.NON_RECYCLING_INSTANCE, "", false)) + .flushMergesAfter(TimeValue.timeValueMinutes(5)) + .externalRefreshListener(emptyList()) + .internalRefreshListener(emptyList()) + .indexSort(null) + .circuitBreakerService(new NoneCircuitBreakerService()) + .globalCheckpointSupplier(() -> SequenceNumbers.NO_OPS_PERFORMED) + .retentionLeasesSupplier(() -> RetentionLeases.EMPTY) + .primaryTermSupplier(primaryTerm) + .tombstoneDocSupplier(tombstoneDocSupplier()) + .build(); + NRTReplicationReaderManager readerManager = new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(reader, shardId), (files) -> {}, - (files) -> {} + (files) -> {}, + testConfig ); assertEquals(initialInfos, readerManager.getSegmentInfos()); try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) {