diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java index dfd1af8c67207..0aeb3828e389e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java @@ -10,6 +10,7 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.DocIdSetIterator; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -27,7 +28,9 @@ import java.io.IOException; import java.time.Instant; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; @@ -348,4 +351,69 @@ public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception { assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs); assertShardsHaveSeqNoDocValues(indexName, false, 1); } + + /** + * Verifies that index and delete operations succeed on replicas after _seq_no doc values have been pruned + * by a force merge. + */ + public void testWritesSucceedOnReplicaAfterSeqNoPruning() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(2); + ensureStableCluster(3); + + final var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(1, 1).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true) + .put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build() + ); + ensureGreen(indexName); + + final int nbBatches = randomIntBetween(5, 10); + final int docsPerBatch = randomIntBetween(20, 50); + final int totalDocs = nbBatches * docsPerBatch; + + final Set docsIds = new HashSet<>(); + for (int batch = 0; batch < nbBatches; batch++) { + var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + var docId = "doc-" + (batch * docsPerBatch + doc); + bulk.add(prepareIndex(indexName).setId(docId).setSource("field", "value-for-" + docId)); + docsIds.add(docId); + } + assertNoFailures(bulk.get()); + } + + flushAndRefresh(indexName); + + assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs); + assertShardsHaveSeqNoDocValues(indexName, true, 2); + + assertRetentionLeasesAdvanced(client(), indexName, totalDocs); + + var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + + refresh(indexName); + assertShardsHaveSeqNoDocValues(indexName, false, 2); + + var deletedIds = randomSubsetOf(randomIntBetween(5, Math.min(20, totalDocs)), docsIds); + for (var docId : deletedIds) { + var response = client().prepareDelete(indexName, docId).get(); + assertThat(response.getResult(), equalTo(DocWriteResponse.Result.DELETED)); + } + + var updatedIds = randomSubsetOf(randomIntBetween(5, Math.min(20, totalDocs)), docsIds); + for (var docId : updatedIds) { + var response = prepareIndex(indexName).setId(docId).setSource("field", "updated").get(); + var expectedResult = deletedIds.contains(docId) ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED; + assertThat(response.getResult(), equalTo(expectedResult)); + } + + ensureGreen(indexName); + } } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 19a527cf8648b..c73993f4883b5 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -185,12 +185,12 @@ private static long readNumericDocValues(LeafReader reader, String field, int do } /** Return null if id is not found. */ - DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException { + DocIdAndSeqNo lookupDocIdAndSeqNo(BytesRef id, LeafReaderContext context, boolean loadSeqNo) throws IOException { assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; final int docID = getDocID(id, context); if (docID != DocIdSetIterator.NO_MORE_DOCS) { - final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID); + final long seqNo = loadSeqNo ? readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID) : UNASSIGNED_SEQ_NO; return new DocIdAndSeqNo(docID, seqNo, context); } else { return null; diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 356606bb492c5..7ccd06fcf8412 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -219,6 +219,15 @@ public static DocIdAndVersion loadDocIdAndVersionUncached(IndexReader reader, By * The result is either null or the live and latest version of the given uid. */ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, BytesRef term) throws IOException { + return loadDocIdAndSeqNo(reader, term, true); + } + + /** + * Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader. + * When {@code loadSeqNo} is false, {@code UNASSIGNED_SEQ_NO} is returned instead of reading the doc value. + * The result is either null or the live and latest version of the given uid. + */ + public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, BytesRef term, boolean loadSeqNo) throws IOException { final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, false); final List leaves = reader.leaves(); // iterate backwards to optimize for the frequently updated documents @@ -226,7 +235,7 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, BytesRef term) for (int i = leaves.size() - 1; i >= 0; i--) { final LeafReaderContext leaf = leaves.get(i); final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; - final DocIdAndSeqNo result = lookup.lookupSeqNo(term, leaf); + final DocIdAndSeqNo result = lookup.lookupDocIdAndSeqNo(term, leaf, loadSeqNo); if (result != null) { return result; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 74fb42a2e669b..d27c457f3f1e4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1039,8 +1039,13 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) } else { // load from index assert incrementIndexVersionLookup(); + final boolean loadSeqNo = engineConfig.getIndexSettings().sequenceNumbersDisabled() == false; try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { - final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid()); + final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo( + searcher.getIndexReader(), + op.uid(), + loadSeqNo + ); if (docAndSeqNo == null) { status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else if (op.seqNo() > docAndSeqNo.seqNo) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 08e6f7c59c0bb..40465414f7021 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4888,6 +4888,22 @@ public void testLookupSeqNoByIdInLucene() throws Exception { } } + public void testLoadDocIdAndSeqNoWithLoadSeqNoFalse() throws IOException { + engine.index(indexForDoc(createParsedDoc("1", null))); + engine.refresh("test"); + + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + DocIdAndSeqNo withSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), Uid.encodeId("1"), true); + assertNotNull(withSeqNo); + assertThat(withSeqNo.seqNo, greaterThanOrEqualTo(0L)); + + DocIdAndSeqNo withoutSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), Uid.encodeId("1"), false); + assertNotNull(withoutSeqNo); + assertThat(withoutSeqNo.seqNo, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(withoutSeqNo.docId, equalTo(withSeqNo.docId)); + } + } + /** * A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the * referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of