Skip to content

Commit 688cf83

Browse files
authored
Enable indexing optimization using sequence numbers on replicas (#43616)
This PR enables the indexing optimization using sequence numbers on replicas. With this optimization, indexing on replicas should be faster and use less memory as it can forgo the version lookup when possible. This change also deactivates the append-only optimization on replicas. Relates #34099
1 parent a56f020 commit 688cf83

File tree

10 files changed

+169
-227
lines changed

10 files changed

+169
-227
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 30 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public class InternalEngine extends Engine {
156156
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
157157
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
158158
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
159-
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
160159
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
161160
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
162161
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
@@ -409,17 +408,11 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
409408

410409
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
411410
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
412-
final String key = entry.getKey();
413-
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
411+
if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) {
414412
assert maxUnsafeAutoIdTimestamp.get() == -1 :
415413
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
416414
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
417415
}
418-
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
419-
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
420-
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
421-
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
422-
}
423416
}
424417
}
425418

@@ -946,46 +939,35 @@ public IndexResult index(Index index) throws IOException {
946939

947940
protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
948941
assert assertNonPrimaryOrigin(index);
942+
// needs to maintain the auto_id timestamp in case this replica becomes primary
943+
if (canOptimizeAddDocument(index)) {
944+
mayHaveBeenIndexedBefore(index);
945+
}
949946
final IndexingStrategy plan;
950-
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
951-
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
952-
/*
953-
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
954-
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
955-
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
956-
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
957-
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
958-
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
959-
*/
960-
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
961-
plan = IndexingStrategy.optimizedAppendOnly(1L);
947+
// unlike the primary, replicas don't really care to about creation status of documents
948+
// this allows to ignore the case where a document was found in the live version maps in
949+
// a delete state and return false for the created flag in favor of code simplicity
950+
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
951+
if (hasBeenProcessedBefore(index)) {
952+
// the operation seq# was processed and thus the same operation was already put into lucene
953+
// this can happen during recovery where older operations are sent from the translog that are already
954+
// part of the lucene commit (either from a peer recovery or a local translog)
955+
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
956+
// question may have been deleted in an out of order op that is not replayed.
957+
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
958+
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
959+
plan = IndexingStrategy.processButSkipLucene(false, index.version());
960+
} else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) {
961+
// see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers
962+
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
963+
plan = IndexingStrategy.optimizedAppendOnly(index.version());
962964
} else {
963-
if (appendOnlyRequest == false) {
964-
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
965-
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
966-
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
967-
}
968965
versionMap.enforceSafeAccess();
969-
// unlike the primary, replicas don't really care to about creation status of documents
970-
// this allows to ignore the case where a document was found in the live version maps in
971-
// a delete state and return false for the created flag in favor of code simplicity
972-
if (index.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()){
973-
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
974-
// this can happen during recovery where older operations are sent from the translog that are already
975-
// part of the lucene commit (either from a peer recovery or a local translog)
976-
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
977-
// question may have been deleted in an out of order op that is not replayed.
978-
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
979-
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
980-
plan = IndexingStrategy.processButSkipLucene(false, index.version());
966+
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
967+
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
968+
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
981969
} else {
982-
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
983-
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
984-
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
985-
} else {
986-
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
987-
index.version());
988-
}
970+
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version());
989971
}
990972
}
991973
return plan;
@@ -1115,11 +1097,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
11151097
return mayHaveBeenIndexBefore;
11161098
}
11171099

1118-
// for testing
1119-
long getMaxSeqNoOfNonAppendOnlyOperations() {
1120-
return maxSeqNoOfNonAppendOnlyOperations.get();
1121-
}
1122-
11231100
private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
11241101
if (docs.size() > 1) {
11251102
indexWriter.addDocuments(docs);
@@ -1168,7 +1145,7 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
11681145
Optional.of(earlyResultOnPreFlightError);
11691146
}
11701147

1171-
public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
1148+
static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
11721149
return new IndexingStrategy(true, false, true, false, versionForIndexing, null);
11731150
}
11741151

@@ -1313,15 +1290,9 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr
13131290

13141291
protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
13151292
assert assertNonPrimaryOrigin(delete);
1316-
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
1317-
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
1318-
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
1319-
// unlike the primary, replicas don't really care to about found status of documents
1320-
// this allows to ignore the case where a document was found in the live version maps in
1321-
// a delete state and return true for the found flag in favor of code simplicity
13221293
final DeletionStrategy plan;
1323-
if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) {
1324-
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
1294+
if (hasBeenProcessedBefore(delete)) {
1295+
// the operation seq# was processed thus this operation was already put into lucene
13251296
// this can happen during recovery where older operations are sent from the translog that are already
13261297
// part of the lucene commit (either from a peer recovery or a local translog)
13271298
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
@@ -1498,7 +1469,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
14981469
noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
14991470
} else {
15001471
markSeqNoAsSeen(noOp.seqNo());
1501-
if (softDeleteEnabled) {
1472+
if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) {
15021473
try {
15031474
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
15041475
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());

0 commit comments

Comments
 (0)