diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 99ba6aed134cf..3cc1b47958505 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -35,7 +35,7 @@ /** * A {@link Translog.Snapshot} from changes in a Lucene index */ -public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { +public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { private long lastSeenSeqNo; private int skippedOperations; private final boolean singleConsumer; @@ -63,7 +63,6 @@ public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot { * @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot * @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()} */ - @SuppressWarnings("this-escape") public LuceneChangesSnapshot( MapperService mapperService, Engine.Searcher engineSearcher, @@ -252,7 +251,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; } else { - final String id = overrideId(fields.id(), leaf, segmentDocID); + final String id = fields.id(); if (isTombstone) { op = new Translog.Delete(id, seqNo, primaryTerm, version); assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; @@ -264,22 +263,14 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { throw new MissingHistoryOperationsException( "source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo ); - } else if (skipDocsWithNullSource()) { + } else { skippedOperations++; return null; } } // TODO: pass the latest timestamp from engine. final long autoGeneratedIdTimestamp = -1; - op = new Translog.Index( - id, - seqNo, - primaryTerm, - version, - overrideSource(source, leaf, segmentDocID), - fields.routing(), - autoGeneratedIdTimestamp - ); + op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp); } } assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java index a113c6d139a0c..673c60c775f51 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java @@ -41,7 +41,7 @@ * The {@code maxMemorySizeInBytes} parameter limits the total size of uncompressed _sources * loaded into memory during batch retrieval. */ -public class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot { +public final class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot { private final long maxMemorySizeInBytes; private final StoredFieldLoader storedFieldLoader; private final SourceLoader sourceLoader; @@ -203,7 +203,7 @@ var record = documentRecords.get(j); if (record.isTombstone()) { continue; } - if (record.hasRecoverySourceSize() == false && skipDocsWithNullSource()) { + if (record.hasRecoverySourceSize() == false) { assert requiredFullRange == false : "source not found for seqno=" + record.seqNo(); continue; } @@ -243,12 +243,7 @@ private Translog.Operation createOperation( return new Translog.NoOp(docRecord.seqNo(), docRecord.primaryTerm(), "null"); } else if (docRecord.isTombstone()) { assert assertDocSoftDeleted(context.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + docRecord + "]"; - return new Translog.Delete( - overrideId(fieldLoader.id(), context, segmentDocID), - docRecord.seqNo(), - docRecord.primaryTerm(), - docRecord.version() - ); + return new Translog.Delete(fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version()); } else { if (docRecord.hasRecoverySourceSize() == false) { // TODO: Callers should ask for the range that source should be retained. Thus we should always @@ -257,18 +252,18 @@ private Translog.Operation createOperation( throw new MissingHistoryOperationsException( "source not found for seqno=" + docRecord.seqNo() + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo ); - } else if (skipDocsWithNullSource()) { + } else { skippedOperations++; return null; } } var source = addSyntheticFields(sourceLoader.source(fieldLoader, segmentDocID), segmentDocID); return new Translog.Index( - overrideId(fieldLoader.id(), context, segmentDocID), + fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version(), - overrideSource(source != null ? source.internalSourceRef() : null, context, segmentDocID), + source != null ? source.internalSourceRef() : null, fieldLoader.routing(), -1 // autogenerated timestamp ); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java index 48dd53a0b2649..8e037279e813e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java @@ -21,7 +21,6 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollectorManager; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.IOUtils; @@ -100,7 +99,7 @@ protected SearchBasedChangesSnapshot( this.toSeqNo = toSeqNo; this.lastSeenSeqNo = fromSeqNo - 1; this.requiredFullRange = requiredFullRange; - this.indexSearcher = createIndexSearcher(engineSearcher); + this.indexSearcher = newIndexSearcher(engineSearcher); this.indexSearcher.setQueryCache(null); long requestingSize = (toSeqNo - fromSeqNo == Long.MAX_VALUE) ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L); @@ -111,18 +110,6 @@ protected SearchBasedChangesSnapshot( this.sourceMetadataFetcher = createSourceMetadataValueFetcher(mapperService, indexSearcher); } - /** - * Creates the {@link IndexSearcher} used to list the documents to include in the snapshot. By default, all documents are returned - * including the non-live ones. This method can be overridden in test to only return live documents. - * - * @param engineSearcher the {@link Engine.Searcher} to create the {@link IndexSearcher} from - * @return an {@link IndexSearcher} - * @throws IOException if something goes wrong - */ - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) throws IOException { - return newIndexSearcher(engineSearcher); - } - private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperService, IndexSearcher searcher) { if (mapperService.mappingLookup().inferenceFields().isEmpty()) { return null; @@ -135,40 +122,6 @@ private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperServic : null; } - /** - * @return if true, documents with _source disabled are also returned. This shouldn't be the case in peer-recoveries where the source is - * retained but this method exist to allow tests classes to also list documents without source. - */ - protected boolean skipDocsWithNullSource() { - return true; - } - - /** - * Allows test classes to override the id of documents loaded in the snapshot. This is useful when the id of the document is null after - * having being trimmed during merges but the test class wants to verify the synthetic id. - * - * @param id the document id - * @param leaf the segment reader - * @param segmentDocID the document ID in the segment - * @return a non-null value for the document id - */ - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return id; - } - - /** - * Allows test classes to override the source of documents loaded in the snapshot. This is useful when the source of the document is - * null after having being trimmed during merges but the test class wants to verify the synthetic source. - * - * @param source the document source - * @param leaf the segment reader - * @param segmentDocID the document ID in the segment - * @return a non-null value for the document source - */ - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return source; - } - /** * Abstract method for retrieving the next operation. Should be implemented by subclasses. * diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 06bbc72b1ebf4..ae2653bab29cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -18,11 +18,13 @@ import org.apache.http.HttpHost; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -102,7 +104,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; @@ -145,11 +146,8 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; -import org.elasticsearch.index.engine.LuceneChangesSnapshot; -import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.ReadOnlyEngine; -import org.elasticsearch.index.engine.SearchBasedChangesSnapshot; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; @@ -157,7 +155,8 @@ import org.elasticsearch.index.mapper.MockFieldFilterPlugin; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMetrics; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; @@ -165,7 +164,6 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.monitor.jvm.HotThreads; @@ -1341,8 +1339,6 @@ protected List getDocIdAndSeqNos(final IndexShard shard) th return shard.withEngineException(engine -> getLiveDocs(engine, true)); } - private static final String NULL_ID = ""; - /** * Returns all live documents of the engine as {@link DocIdSeqNoAndSource}. * @@ -1369,163 +1365,88 @@ private static List getLiveDocs(Engine engine, boolean refr // Here we set up a source loader similar to what search fetch phase use to force loading the source, or id, before comparing docs. final var sourceLoader = mapperService.mappingLookup().newSourceLoader(null, SourceFieldMetrics.NOOP); final var storedFieldLoader = StoredFieldLoader.create(true, sourceLoader.requiredStoredFields()); - final TriFunction forceLoadingSource = (src, leaf, segmentDocID) -> { - if (src != null || sourceEnabled == false) { - return src; - } - try { - var leafLoader = storedFieldLoader.getLoader(leaf, null); - leafLoader.advanceTo(segmentDocID); - src = sourceLoader.leaf(leaf.reader(), new int[] { segmentDocID }).source(leafLoader, segmentDocID).internalSourceRef(); - assert src != null; - assert src.length() > 0; - return src; - } catch (IOException ioe) { - throw new AssertionError(ioe); - } - }; // Some indices merge away the _id field final var pruneIdField = engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES; - final var idLoader = IdLoader.create(mapperService.getIndexSettings(), mapperService.mappingLookup()); - final TriFunction forceLoadingId = (id, leaf, segmentDocID) -> { - if (id != null) { - return id; - } else if (pruneIdField == false) { - throw new AssertionError("Document has a null value for _id field, but ids are not merged away"); - } - try { - var leafLoader = storedFieldLoader.getLoader(leaf, null); - leafLoader.advanceTo(segmentDocID); - id = idLoader.leaf(leafLoader, leaf.reader(), new int[] { segmentDocID }).getId(segmentDocID); - assert id != null; - assert id.isEmpty() == false; - return id; - } catch (IOException ioe) { - throw new AssertionError(ioe); - } - }; - Engine.Searcher searcher = engine.acquireSearcher(reason, Engine.SearcherScope.INTERNAL); - try { - Translog.Snapshot snapshot = null; - try { - if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) { - snapshot = new LuceneSyntheticSourceChangesSnapshot( - mapperService, - searcher, - SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, - RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes(), - 0L, - Long.MAX_VALUE, - false, - true - ) { - @Override - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) { - return new IndexSearcher(engineSearcher.getDirectoryReader()); // Return only live docs, not all docs - } + // Some integration tests merge away the _seq_no field, in which case this method sets all _seq_no to UNASSIGNED_SEQ_NO + final boolean seqNoDisabled = engineConfig.getIndexSettings().sequenceNumbersDisabled(); + assert seqNoDisabled == false + || engineConfig.getIndexSettings().seqNoIndexOptions().equals(SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY); + + final var docs = new ArrayList(); + try (var searcher = engine.acquireSearcher(reason, Engine.SearcherScope.INTERNAL)) { + final var reader = searcher.getDirectoryReader(); + for (LeafReaderContext leaf : reader.leaves()) { + final var leafReader = leaf.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + final int maxDoc = leafReader.maxDoc(); + + var segmentDocIds = new ArrayList(); + // Only collect root documents; nested documents lack _primary_term doc values + var primaryTermDocValues = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDocValues == null) { + continue; + } + for (int docId = 0; docId < maxDoc; docId++) { + if (liveDocs != null && liveDocs.get(docId) == false) { + continue; + } + if (primaryTermDocValues.advanceExact(docId)) { + segmentDocIds.add(docId); + } + } + if (segmentDocIds.isEmpty()) { + continue; + } - @Override - protected boolean skipDocsWithNullSource() { - return false; // Return docs with null source too - } + int[] docIdsArray = segmentDocIds.stream().mapToInt(Integer::intValue).toArray(); + var leafStoredFieldLoader = storedFieldLoader.getLoader(leaf, docIdsArray); + var leafSourceLoader = sourceLoader.leaf(leafReader, docIdsArray); + var leafIdLoader = idLoader.leaf(leafStoredFieldLoader, leafReader, docIdsArray); - @Override - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingId.apply(id, leaf, segmentDocID); - } + primaryTermDocValues = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + final NumericDocValues seqNoDocValues = seqNoDisabled ? null : leafReader.getNumericDocValues(SeqNoFieldMapper.NAME); + final NumericDocValues versionDocValues = leafReader.getNumericDocValues(VersionFieldMapper.NAME); - @Override - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingSource.apply(source, leaf, segmentDocID); - } - }; - } else { - snapshot = new LuceneChangesSnapshot( - mapperService, - searcher, - SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, - 0L, - Long.MAX_VALUE, - false, - true, - true - ) { - @Override - protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) { - return new IndexSearcher(engineSearcher.getDirectoryReader()); // Return only live docs, not all docs - } + for (int docId : docIdsArray) { + leafStoredFieldLoader.advanceTo(docId); - @Override - protected boolean skipDocsWithNullSource() { - return false; // Return docs with null source too - } + final var id = leafIdLoader.getId(docId); + if (id == null && pruneIdField == false) { + throw new AssertionError("Document has a null _id but ids are not merged away"); + } - @Override - protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingId.apply(id, leaf, segmentDocID); - } + BytesRef source = null; + if (sourceEnabled) { + var src = leafSourceLoader.source(leafStoredFieldLoader, docId).internalSourceRef(); + source = src != null ? src.toBytesRef() : null; + } - @Override - protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) { - return forceLoadingSource.apply(source, leaf, segmentDocID); - } - }; - } - if (snapshot.totalOperations() == 0) { - return List.of(); - } + final long seqNo = seqNoDocValues != null && seqNoDocValues.advanceExact(docId) + ? seqNoDocValues.longValue() + : SequenceNumbers.UNASSIGNED_SEQ_NO; - final var docs = new ArrayList(snapshot.totalOperations()); - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - DocIdSeqNoAndSource doc; - switch (operation.opType()) { - case CREATE: - case INDEX: - final var indexOp = ESTestCase.asInstanceOf(Translog.Index.class, operation); - doc = new DocIdSeqNoAndSource( - Uid.decodeId(indexOp.uid()), - sourceEnabled && indexOp.source() != null ? indexOp.source().toBytesRef() : null, - indexOp.seqNo(), - indexOp.primaryTerm(), - indexOp.version() - ); - break; - case DELETE: - final var deleteOp = ESTestCase.asInstanceOf(Translog.Delete.class, operation); - doc = new DocIdSeqNoAndSource( - Uid.decodeId(deleteOp.uid()), - null, - deleteOp.seqNo(), - deleteOp.primaryTerm(), - deleteOp.version() - ); - break; - case NO_OP: - continue; - default: - throw new AssertionError("Unsupported operation type " + operation.opType()); - } - docs.add(doc); - } - docs.sort( - Comparator.comparingLong(DocIdSeqNoAndSource::seqNo) - .thenComparingLong(DocIdSeqNoAndSource::primaryTerm) - .thenComparing((DocIdSeqNoAndSource::id)) - ); - return docs; - } finally { - if (snapshot != null) { - IOUtils.close(snapshot); - searcher = null; + boolean found = primaryTermDocValues.advanceExact(docId); + assert found : "found no primary term for: " + docId; + final long primaryTerm = primaryTermDocValues.longValue(); + + found = versionDocValues.advanceExact(docId); + assert found : "found no version for: " + docId; + final long version = versionDocValues.longValue(); + + docs.add(new DocIdSeqNoAndSource(id, source, seqNo, primaryTerm, version)); } } - } finally { - IOUtils.close(searcher); } + + docs.sort( + Comparator.comparingLong(DocIdSeqNoAndSource::seqNo) + .thenComparingLong(DocIdSeqNoAndSource::primaryTerm) + .thenComparing((DocIdSeqNoAndSource::id)) + ); + return docs; } private static List getLiveDocsNoOpEngine(