Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
*/
package org.elasticsearch.index.seqno;

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -76,23 +80,7 @@ public void testSeqNoPrunedAfterMerge() throws Exception {
);

// waits for retention leases to advance past all docs
assertBusy(() -> {
for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
for (RetentionLease lease : indexShard.getRetentionLeases().leases()) {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(totalDocs)
);
}
}
}
}
}
});
assertRetentionLeasesAdvanced(indexName, totalDocs);

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));
Expand Down Expand Up @@ -134,4 +122,185 @@ public void testSeqNoPrunedAfterMerge() throws Exception {
assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, false, peerRecovery ? 2 : 1);
}

public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final var indexName = randomIdentifier();
createIndex(
indexName,
indexSettings(1, 0).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true)
.put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build()
);
ensureGreen(indexName);

final int nbBatches = randomIntBetween(5, 10);
final int docsPerBatch = randomIntBetween(20, 50);
final long totalDocs = (long) nbBatches * docsPerBatch;

for (int batch = 0; batch < nbBatches; batch++) {
var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
bulk.add(prepareIndex(indexName).setSource("field", "value-" + batch + "-" + doc));
}
assertNoFailures(bulk.get());
}

flushAndRefresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, true, 1);

assertThat(
indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(),
greaterThan(1L)
);

// add a retention lease holding at a sequence number in the middle of the indexed range
final long maxSeqNo = indicesAdmin().prepareStats(indexName).get().getShards()[0].getSeqNoStats().getMaxSeqNo();
final long retentionLeaseSeqNo = randomLongBetween(1, maxSeqNo);
final var retentionLeaseId = randomIdentifier();
final var shardId = new ShardId(resolveIndex(indexName), 0);
client().execute(
RetentionLeaseActions.ADD,
new RetentionLeaseActions.AddRequest(shardId, retentionLeaseId, retentionLeaseSeqNo, "test")
).actionGet();

// wait for peer recovery retention leases to advance past all docs; the custom lease stays
assertBusy(() -> {
for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
for (RetentionLease lease : indexShard.getRetentionLeases().leases()) {
if (lease.id().equals(retentionLeaseId)) {
assertThat(lease.retainingSequenceNumber(), equalTo(retentionLeaseSeqNo));
} else {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(maxSeqNo + 1)
);
}
}
}
}
}
}
});

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));

assertThat(
indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(),
equalTo(1L)
);

refresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
// seq_no doc values should still be present because the custom retention lease prevented full pruning
assertShardsHaveSeqNoDocValues(indexName, true, 1);

// verify only docs with seq_no >= retentionLeaseSeqNo retained their doc values
final long expectedRetainedDocs = maxSeqNo + 1 - retentionLeaseSeqNo;

int checkedShards = 0;
for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
Long docsWithSeqNoOnShard = indexShard.withEngineOrNull(engine -> {
if (engine == null) {
return null;
}
try (var searcher = engine.acquireSearcher("assert_seq_no_count")) {
long nbDocsWithSeqNo = 0;
for (var leaf : searcher.getLeafContexts()) {
NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
if (seqNoDV != null) {
while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
nbDocsWithSeqNo++;
}
}
}
return nbDocsWithSeqNo;
} catch (IOException e) {
throw new AssertionError(e);
}
});
if (docsWithSeqNoOnShard != null) {
assertThat(
"docs with seq_no >= " + retentionLeaseSeqNo + " should retain doc values",
docsWithSeqNoOnShard,
equalTo(expectedRetainedDocs)
);
checkedShards++;
}
}
}
}
}
assertThat("expected to verify at least one shard", checkedShards, equalTo(1));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can remove the lease, index some data and force merge again to ensure that we eventually remove the seq_nos?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I pushed de3f874


// remove the custom retention lease, index more data and force merge again to verify full pruning
client().execute(RetentionLeaseActions.REMOVE, new RetentionLeaseActions.RemoveRequest(shardId, retentionLeaseId)).actionGet();

var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
bulk.add(prepareIndex(indexName).setSource("field", "value-extra-" + doc));
}
assertNoFailures(bulk.get());

flushAndRefresh(indexName);

final long newMaxSeqNo = indicesAdmin().prepareStats(indexName).get().getShards()[0].getSeqNoStats().getMaxSeqNo();

// wait for all retention leases to advance past all docs
assertRetentionLeasesAdvanced(indexName, newMaxSeqNo + 1);

forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));

assertThat(
indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(),
equalTo(1L)
);

refresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs + docsPerBatch);
assertShardsHaveSeqNoDocValues(indexName, false, 1);
}

/**
* Waits for all retention leases on all copies of the given index to have their retaining sequence number
* equal to the expected value.
*/
private static void assertRetentionLeasesAdvanced(String indexName, long expectedRetainingSeqNo) throws Exception {
assertBusy(() -> {
for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
for (RetentionLease lease : indexShard.getRetentionLeases().leases()) {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(expectedRetainingSeqNo)
);
}
}
}
}
}
});
}
}