Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* A {@link Translog.Snapshot} from changes in a Lucene index
*/
public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot {
public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot {
private long lastSeenSeqNo;
private int skippedOperations;
private final boolean singleConsumer;
Expand Down Expand Up @@ -63,7 +63,6 @@ public class LuceneChangesSnapshot extends SearchBasedChangesSnapshot {
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
*/
@SuppressWarnings("this-escape")
public LuceneChangesSnapshot(
MapperService mapperService,
Engine.Searcher engineSearcher,
Expand Down Expand Up @@ -252,7 +251,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]";
} else {
final String id = overrideId(fields.id(), leaf, segmentDocID);
final String id = fields.id();
if (isTombstone) {
op = new Translog.Delete(id, seqNo, primaryTerm, version);
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
Expand All @@ -264,22 +263,14 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
throw new MissingHistoryOperationsException(
"source not found for seqno=" + seqNo + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo
);
} else if (skipDocsWithNullSource()) {
} else {
skippedOperations++;
return null;
}
}
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(
id,
seqNo,
primaryTerm,
version,
overrideSource(source, leaf, segmentDocID),
fields.routing(),
autoGeneratedIdTimestamp
);
op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* The {@code maxMemorySizeInBytes} parameter limits the total size of uncompressed _sources
* loaded into memory during batch retrieval.
*/
public class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot {
public final class LuceneSyntheticSourceChangesSnapshot extends SearchBasedChangesSnapshot {
private final long maxMemorySizeInBytes;
private final StoredFieldLoader storedFieldLoader;
private final SourceLoader sourceLoader;
Expand Down Expand Up @@ -203,7 +203,7 @@ var record = documentRecords.get(j);
if (record.isTombstone()) {
continue;
}
if (record.hasRecoverySourceSize() == false && skipDocsWithNullSource()) {
if (record.hasRecoverySourceSize() == false) {
assert requiredFullRange == false : "source not found for seqno=" + record.seqNo();
continue;
}
Expand Down Expand Up @@ -243,12 +243,7 @@ private Translog.Operation createOperation(
return new Translog.NoOp(docRecord.seqNo(), docRecord.primaryTerm(), "null");
} else if (docRecord.isTombstone()) {
assert assertDocSoftDeleted(context.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + docRecord + "]";
return new Translog.Delete(
overrideId(fieldLoader.id(), context, segmentDocID),
docRecord.seqNo(),
docRecord.primaryTerm(),
docRecord.version()
);
return new Translog.Delete(fieldLoader.id(), docRecord.seqNo(), docRecord.primaryTerm(), docRecord.version());
} else {
if (docRecord.hasRecoverySourceSize() == false) {
// TODO: Callers should ask for the range that source should be retained. Thus we should always
Expand All @@ -257,18 +252,18 @@ private Translog.Operation createOperation(
throw new MissingHistoryOperationsException(
"source not found for seqno=" + docRecord.seqNo() + " from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo
);
} else if (skipDocsWithNullSource()) {
} else {
skippedOperations++;
return null;
}
}
var source = addSyntheticFields(sourceLoader.source(fieldLoader, segmentDocID), segmentDocID);
return new Translog.Index(
overrideId(fieldLoader.id(), context, segmentDocID),
fieldLoader.id(),
docRecord.seqNo(),
docRecord.primaryTerm(),
docRecord.version(),
overrideSource(source != null ? source.internalSourceRef() : null, context, segmentDocID),
source != null ? source.internalSourceRef() : null,
fieldLoader.routing(),
-1 // autogenerated timestamp
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollectorManager;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.core.IOUtils;
Expand Down Expand Up @@ -100,7 +99,7 @@ protected SearchBasedChangesSnapshot(
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.indexSearcher = createIndexSearcher(engineSearcher);
this.indexSearcher = newIndexSearcher(engineSearcher);
this.indexSearcher.setQueryCache(null);

long requestingSize = (toSeqNo - fromSeqNo == Long.MAX_VALUE) ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L);
Expand All @@ -111,18 +110,6 @@ protected SearchBasedChangesSnapshot(
this.sourceMetadataFetcher = createSourceMetadataValueFetcher(mapperService, indexSearcher);
}

/**
* Creates the {@link IndexSearcher} used to list the documents to include in the snapshot. By default, all documents are returned
* including the non-live ones. This method can be overridden in test to only return live documents.
*
* @param engineSearcher the {@link Engine.Searcher} to create the {@link IndexSearcher} from
* @return an {@link IndexSearcher}
* @throws IOException if something goes wrong
*/
protected IndexSearcher createIndexSearcher(Engine.Searcher engineSearcher) throws IOException {
return newIndexSearcher(engineSearcher);
}

private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperService, IndexSearcher searcher) {
if (mapperService.mappingLookup().inferenceFields().isEmpty()) {
return null;
Expand All @@ -135,40 +122,6 @@ private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperServic
: null;
}

/**
* @return if true, documents with _source disabled are also returned. This shouldn't be the case in peer-recoveries where the source is
* retained but this method exist to allow tests classes to also list documents without source.
*/
protected boolean skipDocsWithNullSource() {
return true;
}

/**
* Allows test classes to override the id of documents loaded in the snapshot. This is useful when the id of the document is null after
* having being trimmed during merges but the test class wants to verify the synthetic id.
*
* @param id the document id
* @param leaf the segment reader
* @param segmentDocID the document ID in the segment
* @return a non-null value for the document id
*/
protected String overrideId(String id, LeafReaderContext leaf, int segmentDocID) {
return id;
}

/**
* Allows test classes to override the source of documents loaded in the snapshot. This is useful when the source of the document is
* null after having being trimmed during merges but the test class wants to verify the synthetic source.
*
* @param source the document source
* @param leaf the segment reader
* @param segmentDocID the document ID in the segment
* @return a non-null value for the document source
*/
protected BytesReference overrideSource(BytesReference source, LeafReaderContext leaf, int segmentDocID) {
return source;
}

/**
* Abstract method for retrieving the next operation. Should be implemented by subclasses.
*
Expand Down
Loading
Loading