From 595d46cf7a886ec16133918e854946a7f683ea28 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 11 Mar 2026 11:17:28 +0100 Subject: [PATCH 1/4] Adjust ESIntegTestCase.getLiveDocs method to account for pruned sequence numbers The method ESIntegTestCase.getLiveDocs verifies that primary and replica have the same set of documents. This method must be adapted to account for sequence numbers that can be merged away on the shard if the IndexSettings.DISABLE_SEQUENCE_NUMBERS is set. This method was previously adjusted for synthetic id and synthetic sources to rely on the Engine's changes snapshot API to retrieve Lucene documents. At that time, LuceneChangesSnapshot and LuceneSyntheticSourceChangesSnapshot were changed to accommodate for missing id/source. It was already a bit ugly but now with _seq_no also pruned it would require even larger changes in those Lucene*ChangesSnapshot classes only for testing, since _seq_no are loaded at the lower level in Lucene*ChangesSnapshot. So I changed ESIntegTestCase to not use the change snapshot API anymore, I reverted the changes in Lucene*ChangesSnapshot classes and now simply bulk load documents from the reader directly. Relates #136305 --- .../index/engine/LuceneChangesSnapshot.java | 9 +- .../LuceneSyntheticSourceChangesSnapshot.java | 17 +- .../engine/SearchBasedChangesSnapshot.java | 49 +--- .../elasticsearch/test/ESIntegTestCase.java | 216 ++++++------------ 4 files changed, 76 insertions(+), 215 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 99ba6aed134cf..495deb14e9dea 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -35,7 +35,7 @@ /** * A {@link Translog.Snapshot} from changes in a Lucene index */ -public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { +public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { private long lastSeenSeqNo; private int skippedOperations; private final boolean singleConsumer; @@ -63,7 +63,6 @@ public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { * @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} */ - @SuppressWarnings("this-escape") public LuceneChangesSnapshot( MapperService mapperService, Engine.Searcher engineSearcher, @@ -252,7 +251,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; } else { - final String id = overrideId(fields.id(), leaf, segmentDocID); + final String id = fields.id(); if (isTombstone) { op = new Translog.Delete(id, seqNo, primaryTerm, version); assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; @@ -264,7 +263,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { throw new MissingHistoryOperationsException( "source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo ); - } else if (skipDocsWithNullSource()) { + } else { skippedOperations++; return null; } @@ -276,7 +275,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { seqNo, primaryTerm, version, - overrideSource(source, leaf, segmentDocID), + source, fields.routing(), autoGeneratedIdTimestamp ); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java index a113c6d139a0c..673c60c775f51 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java @@ -41,7 +41,7 @@ * The {@code maxMemorySizeInBytes} parameter limits the total size of uncompressed _sources * loaded into memory during batch retrieval. */ -public class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot { +public final class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot { private final long maxMemorySizeInBytes; private final StoredFieldLoader storedFieldLoader; private final SourceLoader sourceLoader; @@ -203,7 +203,7 @@ var record = documentRecords.get(j); if (record.isTombstone()) { continue; } - if (record.hasRecoverySourceSize() == false && skipDocsWithNullSource()) { + if (record.hasRecoverySourceSize() == false) { assert requiredFullRange == false : "source not found for seqno=" + record.seqNo(); continue; } @@ -243,12 +243,7 @@ private Translog.Operation createOperation( return new Translog.NoOp(docRecord.seqNo(), docRecord.primaryTerm(), "null"); } else if (docRecord.isTombstone()) { assert assertDocSoftDeleted(context.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + docRecord + "]"; - return new Translog.Delete( - overrideId(fieldLoader.id(), context, segmentDocID), - docRecord.seqNo(), - docRecord.primaryTerm(), - docRecord.version() - ); + return new Translog.Delete(fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version()); } else { if (docRecord.hasRecoverySourceSize() == false) { // TODO: Callers should ask for the range that source should be retained. Thus we should always @@ -257,18 +252,18 @@ private Translog.Operation createOperation( throw new MissingHistoryOperationsException( "source not found for seqno=" + docRecord.seqNo() + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo ); - } else if (skipDocsWithNullSource()) { + } else { skippedOperations++; return null; } } var source = addSyntheticFields(sourceLoader.source(fieldLoader, segmentDocID), segmentDocID); return new Translog.Index( - overrideId(fieldLoader.id(), context, segmentDocID), + fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version(), - overrideSource(source != null ? source.internalSourceRef() : null, context, segmentDocID), + source != null ? source.internalSourceRef() : null, fieldLoader.routing(), -1 // autogenerated timestamp ); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java index 48dd53a0b2649..8e037279e813e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java @@ -21,7 +21,6 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollectorManager; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.IOUtils; @@ -100,7 +99,7 @@ protected SearchBasedChangesSnapshot( this.toSeqNo = toSeqNo; this.lastSeenSeqNo = fromSeqNo - 1; this.requiredFullRange = requiredFullRange; - this.indexSearcher = createIndexSearcher(engineSearcher); + this.indexSearcher = newIndexSearcher(engineSearcher); this.indexSearcher.setQueryCache(null); long requestingSize = (toSeqNo - fromSeqNo == Long.MAX_VALUE) ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); @@ -111,18 +110,6 @@ protected SearchBasedChangesSnapshot( this.sourceMetadataFetcher = createSourceMetadataValueFetcher(mapperService, indexSearcher); } - /** - * Creates the {@link IndexSearcher} used to list the documents to include in the snapshot. By default, all documents are returned - * including the non-live ones. This method can be overridden in test to only return live documents. - * - * @param engineSearcher the {@link Engine.Searcher} to create the {@link IndexSearcher} from - * @return an {@link IndexSearcher} - * @throws IOException if something goes wrong - */ - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) throws IOException { - return newIndexSearcher(engineSearcher); - } - private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperService, IndexSearcher searcher) { if (mapperService.mappingLookup().inferenceFields().isEmpty()) { return null; @@ -135,40 +122,6 @@ private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperServic : null; } - /** - * @return if true, documents with _source disabled are also returned. This shouldn't be the case in peer-recoveries where the source is - * retained but this method exist to allow tests classes to also list documents without source. - */ - protected boolean skipDocsWithNullSource() { - return true; - } - - /** - * Allows test classes to override the id of documents loaded in the snapshot. This is useful when the id of the document is null after - * having being trimmed during merges but the test class wants to verify the synthetic id. - * - * @param id the document id - * @param leaf the segment reader - * @param segmentDocID the document ID in the segment - * @return a non-null value for the document id - */ - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return id; - } - - /** - * Allows test classes to override the source of documents loaded in the snapshot. This is useful when the source of the document is - * null after having being trimmed during merges but the test class wants to verify the synthetic source. - * - * @param source the document source - * @param leaf the segment reader - * @param segmentDocID the document ID in the segment - * @return a non-null value for the document source - */ - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return source; - } - /** * Abstract method for retrieving the next operation. Should be implemented by subclasses. * diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 06bbc72b1ebf4..0c2b13aed6004 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -18,11 +18,13 @@ import org.apache.http.HttpHost; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -102,7 +104,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; @@ -145,11 +146,8 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; -import org.elasticsearch.index.engine.LuceneChangesSnapshot; -import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.ReadOnlyEngine; -import org.elasticsearch.index.engine.SearchBasedChangesSnapshot; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; @@ -157,7 +155,8 @@ import org.elasticsearch.index.mapper.MockFieldFilterPlugin; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMetrics; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; @@ -165,7 +164,6 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.monitor.jvm.HotThreads; @@ -1341,8 +1339,6 @@ protected List getDocIdAndSeqNos(final IndexShard shard) th return shard.withEngineException(engine -> getLiveDocs(engine, true)); } - private static final String NULL_ID = ""; - /** * Returns all live documents of the engine as {@link DocIdSeqNoAndSource}. * @@ -1369,163 +1365,81 @@ private static List getLiveDocs(Engine engine, boolean refr // Here we set up a source loader similar to what search fetch phase use to force loading the source, or id, before comparing docs. final var sourceLoader = mapperService.mappingLookup().newSourceLoader(null, SourceFieldMetrics.NOOP); final var storedFieldLoader = StoredFieldLoader.create(true, sourceLoader.requiredStoredFields()); - final TriFunction forceLoadingSource = (src, leaf, segmentDocID) -> { - if (src != null || sourceEnabled == false) { - return src; - } - try { - var leafLoader = storedFieldLoader.getLoader(leaf, null); - leafLoader.advanceTo(segmentDocID); - src = sourceLoader.leaf(leaf.reader(), new int[] { segmentDocID }).source(leafLoader, segmentDocID).internalSourceRef(); - assert src != null; - assert src.length() > 0; - return src; - } catch (IOException ioe) { - throw new AssertionError(ioe); - } - }; // Some indices merge away the _id field final var pruneIdField = engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES; - final var idLoader = IdLoader.create(mapperService.getIndexSettings(), mapperService.mappingLookup()); - final TriFunction forceLoadingId = (id, leaf, segmentDocID) -> { - if (id != null) { - return id; - } else if (pruneIdField == false) { - throw new AssertionError("Document has a null value for _id field, but ids are not merged away"); - } - try { - var leafLoader = storedFieldLoader.getLoader(leaf, null); - leafLoader.advanceTo(segmentDocID); - id = idLoader.leaf(leafLoader, leaf.reader(), new int[] { segmentDocID }).getId(segmentDocID); - assert id != null; - assert id.isEmpty() == false; - return id; - } catch (IOException ioe) { - throw new AssertionError(ioe); - } - }; - Engine.Searcher searcher = engine.acquireSearcher(reason, Engine.SearcherScope.INTERNAL); - try { - Translog.Snapshot snapshot = null; - try { - if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) { - snapshot = new LuceneSyntheticSourceChangesSnapshot( - mapperService, - searcher, - SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, - RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes(), - 0L, - Long.MAX_VALUE, - false, - true - ) { - @Override - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) { - return new IndexSearcher(engineSearcher.getDirectoryReader()); // Return only live docs, not all docs - } + // Some integration tests merge away the _seq_no field, in which case this method sets all _seq_no to UNASSIGNED_SEQ_NO + final boolean seqNoDisabled = engineConfig.getIndexSettings().sequenceNumbersDisabled(); + assert seqNoDisabled == false + || engineConfig.getIndexSettings().seqNoIndexOptions().equals(SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY); + + final var docs = new ArrayList(); + try (var searcher = engine.acquireSearcher(reason, Engine.SearcherScope.INTERNAL)) { + final var reader = searcher.getDirectoryReader(); + for (LeafReaderContext leaf : reader.leaves()) { + final var leafReader = leaf.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + final int maxDoc = leafReader.maxDoc(); + + var segmentDocIds = new ArrayList(); + for (int docId = 0; docId < maxDoc; docId++) { + if (liveDocs != null && liveDocs.get(docId) == false) { // Return only live docs, not all docs + continue; + } + segmentDocIds.add(docId); + } + if (segmentDocIds.isEmpty()) { + continue; + } - @Override - protected boolean skipDocsWithNullSource() { - return false; // Return docs with null source too - } + int[] docIdsArray = segmentDocIds.stream().mapToInt(Integer::intValue).toArray(); + var leafStoredFieldLoader = storedFieldLoader.getLoader(leaf, docIdsArray); + var leafSourceLoader = sourceLoader.leaf(leafReader, docIdsArray); + var leafIdLoader = idLoader.leaf(leafStoredFieldLoader, leafReader, docIdsArray); - @Override - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingId.apply(id, leaf, segmentDocID); - } + final NumericDocValues seqNoDocValues = seqNoDisabled ? null : leafReader.getNumericDocValues(SeqNoFieldMapper.NAME); + final NumericDocValues primaryTermDocValues = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + final NumericDocValues versionDocValues = leafReader.getNumericDocValues(VersionFieldMapper.NAME); - @Override - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingSource.apply(source, leaf, segmentDocID); - } - }; - } else { - snapshot = new LuceneChangesSnapshot( - mapperService, - searcher, - SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, - 0L, - Long.MAX_VALUE, - false, - true, - true - ) { - @Override - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) { - return new IndexSearcher(engineSearcher.getDirectoryReader()); // Return only live docs, not all docs - } + for (int docId : docIdsArray) { + leafStoredFieldLoader.advanceTo(docId); - @Override - protected boolean skipDocsWithNullSource() { - return false; // Return docs with null source too - } + final var id = leafIdLoader.getId(docId); + if (id == null && pruneIdField == false) { + throw new AssertionError("Document has a null _id but ids are not merged away"); + } - @Override - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingId.apply(id, leaf, segmentDocID); - } + BytesRef source = null; + if (sourceEnabled) { + var src = leafSourceLoader.source(leafStoredFieldLoader, docId).internalSourceRef(); + source = src != null ? src.toBytesRef() : null; + } - @Override - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingSource.apply(source, leaf, segmentDocID); - } - }; - } - if (snapshot.totalOperations() == 0) { - return List.of(); - } + final long seqNo = seqNoDocValues != null && seqNoDocValues.advanceExact(docId) + ? seqNoDocValues.longValue() + : SequenceNumbers.UNASSIGNED_SEQ_NO; - final var docs = new ArrayList(snapshot.totalOperations()); - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - DocIdSeqNoAndSource doc; - switch (operation.opType()) { - case CREATE: - case INDEX: - final var indexOp = ESTestCase.asInstanceOf(Translog.Index.class, operation); - doc = new DocIdSeqNoAndSource( - Uid.decodeId(indexOp.uid()), - sourceEnabled && indexOp.source() != null ? indexOp.source().toBytesRef() : null, - indexOp.seqNo(), - indexOp.primaryTerm(), - indexOp.version() - ); - break; - case DELETE: - final var deleteOp = ESTestCase.asInstanceOf(Translog.Delete.class, operation); - doc = new DocIdSeqNoAndSource( - Uid.decodeId(deleteOp.uid()), - null, - deleteOp.seqNo(), - deleteOp.primaryTerm(), - deleteOp.version() - ); - break; - case NO_OP: - continue; - default: - throw new AssertionError("Unsupported operation type " + operation.opType()); - } - docs.add(doc); - } - docs.sort( - Comparator.comparingLong(DocIdSeqNoAndSource::seqNo) - .thenComparingLong(DocIdSeqNoAndSource::primaryTerm) - .thenComparing((DocIdSeqNoAndSource::id)) - ); - return docs; - } finally { - if (snapshot != null) { - IOUtils.close(snapshot); - searcher = null; + boolean found = primaryTermDocValues.advanceExact(docId); + assert found : "found no primary term for: " + docId; + final long primaryTerm = primaryTermDocValues.longValue(); + + found = versionDocValues.advanceExact(docId); + assert found : "found no version for: " + docId; + final long version = versionDocValues.longValue(); + + docs.add(new DocIdSeqNoAndSource(id, source, seqNo, primaryTerm, version)); } } - } finally { - IOUtils.close(searcher); } + + docs.sort( + Comparator.comparingLong(DocIdSeqNoAndSource::seqNo) + .thenComparingLong(DocIdSeqNoAndSource::primaryTerm) + .thenComparing((DocIdSeqNoAndSource::id)) + ); + return docs; } private static List getLiveDocsNoOpEngine( From 4879f3fd448e5c2c64d50a7d9a886f139591734b Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 11 Mar 2026 12:58:05 +0100 Subject: [PATCH 2/4] [Test] Add CCR integration tests for pruned sequence numbers This commit adds tests to verify that CCR works correctly with pruned sequence numbers. The test is inspired by SeqNoPruningIT. Note: made by Cursor, adjusted by me. Also requires #143999 to pass. Relates #136305 --- .../index/seqno/SeqNoPruningIT.java | 28 +- .../index/engine/LuceneChangesSnapshot.java | 10 +- .../index/seqno/SequenceNumbersTestUtils.java | 45 ++- .../xpack/ccr/CcrSeqNoPruningIT.java | 299 ++++++++++++++++++ 4 files changed, 347 insertions(+), 35 deletions(-) create mode 100644 x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java index fb0e0b86f1755..3238c4408979e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -80,7 +81,7 @@ public void testSeqNoPrunedAfterMerge() throws Exception { ); // waits for retention leases to advance past all docs - assertRetentionLeasesAdvanced(indexName, totalDocs); + assertRetentionLeasesAdvanced(client(), indexName, totalDocs); var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); assertThat(forceMerge.getFailedShards(), equalTo(0)); @@ -264,7 +265,7 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception { final long newMaxSeqNo = indicesAdmin().prepareStats(indexName).get().getShards()[0].getSeqNoStats().getMaxSeqNo(); // wait for all retention leases to advance past all docs - assertRetentionLeasesAdvanced(indexName, newMaxSeqNo + 1); + assertRetentionLeasesAdvanced(client(), indexName, newMaxSeqNo + 1); forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); assertThat(forceMerge.getFailedShards(), equalTo(0)); @@ -280,27 +281,4 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception { assertShardsHaveSeqNoDocValues(indexName, false, 1); } - /** - * Waits for all retention leases on all copies of the given index to have their retaining sequence number - * equal to the expected value. - */ - private static void assertRetentionLeasesAdvanced(String indexName, long expectedRetainingSeqNo) throws Exception { - assertBusy(() -> { - for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) { - for (var indexService : indicesServices) { - if (indexService.index().getName().equals(indexName)) { - for (var indexShard : indexService) { - for (RetentionLease lease : indexShard.getRetentionLeases().leases()) { - assertThat( - "retention lease [" + lease.id() + "] should have advanced", - lease.retainingSequenceNumber(), - equalTo(expectedRetainingSeqNo) - ); - } - } - } - } - } - }); - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 495deb14e9dea..3cc1b47958505 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -270,15 +270,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { } // TODO: pass the latest timestamp from engine. final long autoGeneratedIdTimestamp = -1; - op = new Translog.Index( - id, - seqNo, - primaryTerm, - version, - source, - fields.routing(), - autoGeneratedIdTimestamp - ); + op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp); } } assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java index 3625df19d698b..f9819ae587e1c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java @@ -10,9 +10,11 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.DocIdSetIterator; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import java.io.IOException; @@ -30,14 +32,32 @@ private SequenceNumbersTestUtils() {} /** * Asserts that all shards of the given index either have or lack {@code _seq_no} doc values on disk. + * Uses the default {@link ESIntegTestCase#internalCluster()}. * * @param indexName the index to check * @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty * @param expectedShards the exact number of shards expected to be verified */ public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expectDocValuesOnDisk, int expectedShards) { + assertShardsHaveSeqNoDocValues(ESIntegTestCase.internalCluster(), indexName, expectDocValuesOnDisk, expectedShards); + } + + /** + * Asserts that all shards of the given index on the given cluster either have or lack {@code _seq_no} doc values on disk. + * + * @param cluster the cluster to check + * @param indexName the index to check + * @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty + * @param expectedShards the exact number of shards expected to be verified + */ + public static void assertShardsHaveSeqNoDocValues( + InternalTestCluster cluster, + String indexName, + boolean expectDocValuesOnDisk, + int expectedShards + ) { int nbCheckedShards = 0; - for (var indicesServices : ESIntegTestCase.internalCluster().getDataNodeInstances(IndicesService.class)) { + for (var indicesServices : cluster.getDataNodeInstances(IndicesService.class)) { for (var indexService : indicesServices) { if (indexService.index().getName().equals(indexName)) { for (var indexShard : indexService) { @@ -75,4 +95,27 @@ public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expe } assertThat("expected to verify " + expectedShards + " shard(s)", nbCheckedShards, equalTo(expectedShards)); } + + /** + * Waits until all retention leases on all shards of the given index have their retaining sequence number + * equal to the expected value. + * + * @param client the client to use for stats requests + * @param indexName the index to check + * @param expectedRetainingSeqNo the expected retaining sequence number for every lease + */ + public static void assertRetentionLeasesAdvanced(Client client, String indexName, long expectedRetainingSeqNo) throws Exception { + ESIntegTestCase.assertBusy(() -> { + var stats = client.admin().indices().prepareStats(indexName).get(); + for (var shardStats : stats.getShards()) { + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + assertThat( + "retention lease [" + lease.id() + "] should have advanced", + lease.retainingSequenceNumber(), + equalTo(expectedRetainingSeqNo) + ); + } + } + }); + } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java new file mode 100644 index 0000000000000..cb82e846dfd6e --- /dev/null +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java @@ -0,0 +1,299 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr; + +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class CcrSeqNoPruningIT extends CcrIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(CcrRetentionLeaseIT.RetentionLeaseRenewIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + + @Override + protected Settings followerClusterSettings() { + return Settings.builder() + .put(super.followerClusterSettings()) + .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100)) + .build(); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + public void testSeqNoPrunedOnLeaderAfterFollowerCatchesUp() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + final var leaderIndex = randomIdentifier(); + final var followerIndex = "follower-" + leaderIndex; + final int numberOfShards = 1; + + final var additionalSettings = Map.of( + IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), + "true", + IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), + SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY.toString(), + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(100).getStringRep() + ); + + final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, additionalSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderGreen(leaderIndex); + + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); + + final int nbBatches = randomIntBetween(5, 10); + final int docsPerBatch = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch + doc)); + } + assertNoFailures(bulk.get()); + } + + flush(leaderClient(), leaderIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, true, numberOfShards); + assertThat( + leaderClient().admin() + .indices() + .prepareStats(leaderIndex) + .clear() + .setSegments(true) + .get() + .getPrimaries() + .getSegments() + .getCount(), + greaterThan(1L) + ); + + final long maxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, maxSeqNo + 1); + + var forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + assertThat( + leaderClient().admin() + .indices() + .prepareStats(leaderIndex) + .clear() + .setSegments(true) + .get() + .getPrimaries() + .getSegments() + .getCount(), + equalTo(1L) + ); + + refresh(leaderClient(), leaderIndex); + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, false, numberOfShards); + + ensureFollowerGreen(true, followerIndex); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + } + + public void testSeqNoPartiallyRetainedByCcrLease() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + final var leaderIndex = randomIdentifier(); + final var followerIndex = "follower-" + leaderIndex; + final int numberOfShards = 1; + + final var additionalSettings = Map.of( + IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), + "true", + IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), + SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY.toString(), + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(100).getStringRep() + ); + + final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, additionalSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderGreen(leaderIndex); + + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); + ensureFollowerGreen(true, followerIndex); + + final int nbBatches = randomIntBetween(5, 10); + final int docsPerBatch = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + final long leaseSeqNoBeforePause = getMaxSeqNo(leaderClient(), leaderIndex) + 1; + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, leaseSeqNoBeforePause); + pauseFollow(followerIndex); + + final int nbBatches2 = randomIntBetween(5, 10); + final int docsPerBatch2 = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches2; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch2; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch2 + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + final long newMaxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + assertBusy(() -> { + var stats = leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + for (ShardStats shardStats : stats.getShards()) { + var ccrLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases() + ); + assertThat(ccrLeases.values(), hasSize(1)); + RetentionLease ccrLease = ccrLeases.values().iterator().next(); + assertThat(ccrLease.retainingSequenceNumber(), equalTo(leaseSeqNoBeforePause)); + + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + if (lease.id().equals(ccrLease.id()) == false) { + assertThat( + "peer recovery lease [" + lease.id() + "] should have advanced", + lease.retainingSequenceNumber(), + equalTo(newMaxSeqNo + 1) + ); + } + } + } + }); + + var forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + flush(leaderClient(), leaderIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, true, numberOfShards); + + final long expectedRetainedDocs = newMaxSeqNo + 1 - leaseSeqNoBeforePause; + assertLeaderShardsRetainedSeqNoDocValuesCount(leaderIndex, expectedRetainedDocs, numberOfShards); + + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet(); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, newMaxSeqNo + 1); + + final int nbBatches3 = randomIntBetween(5, 10); + final int docsPerBatch3 = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches3; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch3; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch3 + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + final long finalMaxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, finalMaxSeqNo + 1); + + forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + refresh(leaderClient(), leaderIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, false, numberOfShards); + + final var newFollowerIndex = "new-follower-" + leaderIndex; + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, newFollowerIndex)).get(); + ensureFollowerGreen(true, newFollowerIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, newFollowerIndex); + } + + private static long getMaxSeqNo(Client client, String index) { + return client.admin().indices().prepareStats(index).get().getShards()[0].getSeqNoStats().getMaxSeqNo(); + } + + private void assertLeaderShardsRetainedSeqNoDocValuesCount(String indexName, long expectedCount, int expectedShards) { + int checked = 0; + for (IndicesService indicesService : getLeaderCluster().getDataNodeInstances(IndicesService.class)) { + for (var indexService : indicesService) { + if (indexService.index().getName().equals(indexName)) { + for (var indexShard : indexService) { + Long count = indexShard.withEngineOrNull(engine -> { + if (engine == null) { + return null; + } + try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { + long total = 0; + for (var leaf : searcher.getLeafContexts()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (seqNoDV != null) { + while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + total++; + } + } + } + return total; + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (count != null) { + assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); + checked++; + } + } + } + } + } + assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + } +} From a032f9a5f971ae9f22f8ada8ee2aa51ba03b0e45 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 11 Mar 2026 15:50:10 +0100 Subject: [PATCH 3/4] move method --- .../index/seqno/SequenceNumbersTestUtils.java | 49 +++++++++++++++++++ .../xpack/ccr/CcrSeqNoPruningIT.java | 42 +--------------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java index f9819ae587e1c..52e3d6e06654d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java @@ -96,6 +96,55 @@ public static void assertShardsHaveSeqNoDocValues( assertThat("expected to verify " + expectedShards + " shard(s)", nbCheckedShards, equalTo(expectedShards)); } + /** + * Asserts that the total number of {@code _seq_no} doc values across all shards of the given index equals the expected count. + * + * @param cluster the cluster to check + * @param indexName the index to check + * @param expectedCount the expected total number of doc values per shard + * @param expectedShards the exact number of shards expected to be verified + */ + public static void assertShardsSeqNoDocValuesCount( + InternalTestCluster cluster, + String indexName, + long expectedCount, + int expectedShards + ) { + int checked = 0; + for (IndicesService indicesService : cluster.getDataNodeInstances(IndicesService.class)) { + for (var indexService : indicesService) { + if (indexService.index().getName().equals(indexName)) { + for (var indexShard : indexService) { + Long count = indexShard.withEngineOrNull(engine -> { + if (engine == null) { + return null; + } + try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { + long total = 0; + for (var leaf : searcher.getLeafContexts()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (seqNoDV != null) { + while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + total++; + } + } + } + return total; + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (count != null) { + assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); + checked++; + } + } + } + } + } + assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + } + /** * Waits until all retention leases on all shards of the given index have their retaining sequence number * equal to the expected value. diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java index cb82e846dfd6e..addd90345fd5e 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.ccr; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.search.DocIdSetIterator; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.WriteRequest; @@ -20,14 +18,12 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseUtils; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; -import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; @@ -35,6 +31,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsSeqNoDocValuesCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; @@ -219,7 +216,7 @@ public void testSeqNoPartiallyRetainedByCcrLease() throws Exception { assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, true, numberOfShards); final long expectedRetainedDocs = newMaxSeqNo + 1 - leaseSeqNoBeforePause; - assertLeaderShardsRetainedSeqNoDocValuesCount(leaderIndex, expectedRetainedDocs, numberOfShards); + assertShardsSeqNoDocValuesCount(getLeaderCluster(), leaderIndex, expectedRetainedDocs, numberOfShards); followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet(); assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); @@ -261,39 +258,4 @@ private static long getMaxSeqNo(Client client, String index) { return client.admin().indices().prepareStats(index).get().getShards()[0].getSeqNoStats().getMaxSeqNo(); } - private void assertLeaderShardsRetainedSeqNoDocValuesCount(String indexName, long expectedCount, int expectedShards) { - int checked = 0; - for (IndicesService indicesService : getLeaderCluster().getDataNodeInstances(IndicesService.class)) { - for (var indexService : indicesService) { - if (indexService.index().getName().equals(indexName)) { - for (var indexShard : indexService) { - Long count = indexShard.withEngineOrNull(engine -> { - if (engine == null) { - return null; - } - try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { - long total = 0; - for (var leaf : searcher.getLeafContexts()) { - NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - if (seqNoDV != null) { - while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - total++; - } - } - } - return total; - } catch (IOException e) { - throw new AssertionError(e); - } - }); - if (count != null) { - assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); - checked++; - } - } - } - } - } - assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); - } } From 918633a654e4d09add0f27a854c83113996fa66c Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 11 Mar 2026 15:58:54 +0100 Subject: [PATCH 4/4] nit --- .../java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java index addd90345fd5e..d261e27a404f8 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java @@ -79,8 +79,6 @@ public void testSeqNoPrunedOnLeaderAfterFollowerCatchesUp() throws Exception { assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); ensureLeaderGreen(leaderIndex); - followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); - final int nbBatches = randomIntBetween(5, 10); final int docsPerBatch = randomIntBetween(20, 50); @@ -108,6 +106,8 @@ public void testSeqNoPrunedOnLeaderAfterFollowerCatchesUp() throws Exception { greaterThan(1L) ); + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); + final long maxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, maxSeqNo + 1);