Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added

### Changed
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))

### Fixed

Expand Down
82 changes: 82 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.Mapping;
import org.opensearch.index.mapper.ParseContext.Document;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -1594,6 +1597,85 @@ public long startTime() {
public abstract TYPE operationType();
}

/**
* Prepares an index operation by parsing the source document and creating an Engine.Index operation.
*
* @param docMapper the document mapper instance
* @param source the source to parse
* @param seqNo the sequence number
* @param primaryTerm the primary term
* @param version the version
* @param versionType the version type
* @param origin the operation origin
* @param autoGeneratedIdTimestamp the timestamp for auto-generated IDs
* @param isRetry whether this is a retry
* @param ifSeqNo the ifSeqNo
* @param ifPrimaryTerm the ifPrimaryTerm
* @return the prepared index operation
*/
public Engine.Index prepareIndex(
DocumentMapperForType docMapper,
SourceToParse source,
long seqNo,
long primaryTerm,
long version,
VersionType versionType,
Engine.Operation.Origin origin,
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm
) {
Comment on lines +1616 to +1628
Copy link
Contributor

@Bukhtawar Bukhtawar Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern with this change is implementations of the Engine interface like the NRTReplicationEngine now gets this method while in practise it isn't supposed to index any document. Thoughts @mch2 ?

Do we need to override this method appropriately?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for NRT specifically prepareIndex is never invoked -

if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary() == false) {
.

but we can take this opportunity to refactor more and remove that block from indexshard and push the noOp to NRT engine directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this isn't getting used but always good to be explicit for future use cases

long startTime = System.nanoTime();
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null) {
doc.addDynamicMappingsUpdate(docMapper.getMapping());
}
Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
return new Engine.Index(
uid,
doc,
seqNo,
primaryTerm,
version,
versionType,
origin,
startTime,
autoGeneratedIdTimestamp,
isRetry,
ifSeqNo,
ifPrimaryTerm
);
}

/**
* Prepares a delete operation by creating an Engine.Delete operation.
*
* @param id the document id
* @param seqNo the sequence number
* @param primaryTerm the primary term
* @param version the version
* @param versionType the version type
* @param origin the operation origin
* @param ifSeqNo the ifSeqNo
* @param ifPrimaryTerm the ifPrimaryTerm
* @return the prepared delete operation
*/
public Engine.Delete prepareDelete(
String id,
long seqNo,
long primaryTerm,
long version,
VersionType versionType,
Engine.Operation.Origin origin,
long ifSeqNo,
long ifPrimaryTerm
) {
long startTime = System.nanoTime();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
return new Engine.Delete(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm);
}

/**
* Index operation
*
Expand Down
16 changes: 14 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ private Engine.IndexResult applyIndexOperation(
ensureWriteAllowed(origin);
Engine.Index operation;
try {
operation = prepareIndex(
operation = engine.prepareIndex(
docMapper(),
sourceToParse,
seqNo,
Expand Down Expand Up @@ -1228,6 +1228,12 @@ private Engine.IndexResult applyIndexOperation(
return index(engine, operation);
}

/**
* Prepares an index operation by parsing the source document and creating an Engine.Index operation.
*
* @deprecated This static method has been moved to {@link Engine#prepareIndex} as an instance method.
*/
@Deprecated(since = "3.4.0", forRemoval = true)
public static Engine.Index prepareIndex(
DocumentMapperForType docMapper,
SourceToParse source,
Expand Down Expand Up @@ -1418,10 +1424,16 @@ private Engine.DeleteResult applyDeleteOperation(
+ getOperationPrimaryTerm()
+ "]";
ensureWriteAllowed(origin);
final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
final Engine.Delete delete = engine.prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
return delete(engine, delete);
}

/**
* Prepares a delete operation by creating an Engine.Delete operation.
*
* @deprecated This static method has been moved to {@link Engine#prepareDelete} as an instance method.
*/
@Deprecated(since = "3.4.0", forRemoval = true)
public static Engine.Delete prepareDelete(
String id,
long seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.index.mapper.RootObjectMapper;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogRecoveryRunner;
Expand Down Expand Up @@ -135,7 +134,7 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O
case INDEX:
final Translog.Index index = (Translog.Index) operation;
final String indexName = mapperService.index().getName();
final Engine.Index engineIndex = IndexShard.prepareIndex(
final Engine.Index engineIndex = engine.prepareIndex(
docMapper(MapperService.SINGLE_MAPPING_NAME),
new SourceToParse(
indexName,
Expand All @@ -157,7 +156,7 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O
return engineIndex;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
return IndexShard.prepareDelete(
return engine.prepareDelete(
delete.id(),
delete.seqNo(),
delete.primaryTerm(),
Expand Down
Loading