Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -27,7 +28,9 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced;
import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues;
Expand Down Expand Up @@ -348,4 +351,69 @@ public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception {
assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, false, 1);
}

/**
* Verifies that index and delete operations succeed on replicas after _seq_no doc values have been pruned
* by a force merge.
*/
public void testWritesSucceedOnReplicaAfterSeqNoPruning() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final var indexName = randomIdentifier();
createIndex(
indexName,
indexSettings(1, 1).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true)
.put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build()
);
ensureGreen(indexName);

final int nbBatches = randomIntBetween(5, 10);
final int docsPerBatch = randomIntBetween(20, 50);
final int totalDocs = nbBatches * docsPerBatch;

final Set<String> docsIds = new HashSet<>();
for (int batch = 0; batch < nbBatches; batch++) {
var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
var docId = "doc-" + (batch * docsPerBatch + doc);
bulk.add(prepareIndex(indexName).setId(docId).setSource("field", "value-for-" + docId));
docsIds.add(docId);
}
assertNoFailures(bulk.get());
}

flushAndRefresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, true, 2);

assertRetentionLeasesAdvanced(client(), indexName, totalDocs);

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));

refresh(indexName);
assertShardsHaveSeqNoDocValues(indexName, false, 2);

var deletedIds = randomSubsetOf(randomIntBetween(5, Math.min(20, totalDocs)), docsIds);
for (var docId : deletedIds) {
var response = client().prepareDelete(indexName, docId).get();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.DELETED));
}

var updatedIds = randomSubsetOf(randomIntBetween(5, Math.min(20, totalDocs)), docsIds);
for (var docId : updatedIds) {
var response = prepareIndex(indexName).setId(docId).setSource("field", "updated").get();
var expectedResult = deletedIds.contains(docId) ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED;
assertThat(response.getResult(), equalTo(expectedResult));
}

ensureGreen(indexName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ private static long readNumericDocValues(LeafReader reader, String field, int do
}

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
DocIdAndSeqNo lookupDocIdAndSeqNo(BytesRef id, LeafReaderContext context, boolean loadSeqNo) throws IOException {
assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey)
: "context's reader is not the same as the reader class was initialized on.";
final int docID = getDocID(id, context);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
final long seqNo = loadSeqNo ? readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID) : UNASSIGNED_SEQ_NO;
return new DocIdAndSeqNo(docID, seqNo, context);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,23 @@ public static DocIdAndVersion loadDocIdAndVersionUncached(IndexReader reader, By
* The result is either null or the live and latest version of the given uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, BytesRef term) throws IOException {
return loadDocIdAndSeqNo(reader, term, true);
}

/**
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* When {@code loadSeqNo} is false, {@code UNASSIGNED_SEQ_NO} is returned instead of reading the doc value.
* The result is either null or the live and latest version of the given uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, BytesRef term, boolean loadSeqNo) throws IOException {
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, false);
final List<LeafReaderContext> leaves = reader.leaves();
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final DocIdAndSeqNo result = lookup.lookupSeqNo(term, leaf);
final DocIdAndSeqNo result = lookup.lookupDocIdAndSeqNo(term, leaf, loadSeqNo);
if (result != null) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,13 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else {
// load from index
assert incrementIndexVersionLookup();
final boolean loadSeqNo = engineConfig.getIndexSettings().sequenceNumbersDisabled() == false;
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid());
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(
searcher.getIndexReader(),
op.uid(),
loadSeqNo
);
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4888,6 +4888,22 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
}
}

public void testLoadDocIdAndSeqNoWithLoadSeqNoFalse() throws IOException {
engine.index(indexForDoc(createParsedDoc("1", null)));
engine.refresh("test");

try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
DocIdAndSeqNo withSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), Uid.encodeId("1"), true);
assertNotNull(withSeqNo);
assertThat(withSeqNo.seqNo, greaterThanOrEqualTo(0L));

DocIdAndSeqNo withoutSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), Uid.encodeId("1"), false);
assertNotNull(withoutSeqNo);
assertThat(withoutSeqNo.seqNo, equalTo(UNASSIGNED_SEQ_NO));
assertThat(withoutSeqNo.docId, equalTo(withSeqNo.docId));
}
}

/**
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
Expand Down
Loading