Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index.seqno;

import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;

import java.util.Collection;
import java.util.List;

import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(numDataNodes = 0)
public class SeqNoPruningIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(InternalSettingsPlugin.class);
}

public void testSeqNoPrunedAfterMerge() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final var indexName = randomIdentifier();
createIndex(
indexName,
indexSettings(1, 0).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true)
.put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build()
);
ensureGreen(indexName);

final int nbBatches = randomIntBetween(5, 10);
final int docsPerBatch = randomIntBetween(20, 50);
final long totalDocs = (long) nbBatches * docsPerBatch;

for (int batch = 0; batch < nbBatches; batch++) {
var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
bulk.add(prepareIndex(indexName).setSource("field", "value-" + batch + "-" + doc));
}
assertNoFailures(bulk.get());
}

flushAndRefresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, true, 1);

assertThat(
indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(),
greaterThan(1L)
);

// waits for retention leases to advance past all docs
assertBusy(() -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add a test where we create a retention lease to ensure that we do not remove sequence numbers from the retention lease seq_no onwards? It could be in a follow up 👍

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes perfect sense, I'll do it in a follow up.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #143825

for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
for (RetentionLease lease : indexShard.getRetentionLeases().leases()) {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(totalDocs)
);
}
}
}
}
}
});

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));

assertThat(
indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(),
equalTo(1L)
);

refresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, false, 1);

final boolean peerRecovery = randomBoolean();
if (peerRecovery) {
logger.info("--> triggering peer recovery by adding a replica");
setReplicaCount(1, indexName);
ensureGreen(indexName);
} else {
logger.info("--> triggering relocation via move allocation command");
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
String sourceNode = primaryShard.currentNodeId();
String sourceNodeName = state.nodes().get(sourceNode).getName();
String targetNodeName = state.nodes()
.getDataNodes()
.values()
.stream()
.filter(n -> n.getName().equals(sourceNodeName) == false)
.findFirst()
.orElseThrow()
.getName();

ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, sourceNodeName, targetNodeName));
waitForRelocation();
}

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);
assertShardsHaveSeqNoDocValues(indexName, false, peerRecovery ? 2 : 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index.seqno;

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;

/**
* Utilities for asserting on sequence number fields at the Lucene level in integration tests.
*/
public final class SequenceNumbersTestUtils {

private SequenceNumbersTestUtils() {}

/**
* Asserts that all shards of the given index either have or lack {@code _seq_no} doc values on disk.
*
* @param indexName the index to check
* @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty
* @param expectedShards the exact number of shards expected to be verified
*/
public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expectDocValuesOnDisk, int expectedShards) {
int nbCheckedShards = 0;
for (var indicesServices : ESIntegTestCase.internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
final var shardId = indexShard.shardId();
var checked = indexShard.withEngineOrNull(engine -> {
if (engine != null) {
try (var searcher = engine.acquireSearcher("assert_seq_no_dv")) {
for (var leaf : searcher.getLeafContexts()) {
var leafReader = leaf.reader();
NumericDocValues seqNoDV = leafReader.getNumericDocValues(SeqNoFieldMapper.NAME);
if (expectDocValuesOnDisk) {
assertThat(shardId + " _seq_no doc values should be present", seqNoDV, notNullValue());
assertThat(seqNoDV.nextDoc(), not(equalTo(DocIdSetIterator.NO_MORE_DOCS)));
} else if (seqNoDV != null) {
assertThat(
shardId + " _seq_no doc values should be empty",
seqNoDV.nextDoc(),
equalTo(DocIdSetIterator.NO_MORE_DOCS)
);
}
return true;
}
} catch (IOException ioe) {
throw new AssertionError(ioe);
}
}
return false;
});
if (checked) {
nbCheckedShards++;
}
}
}
}
}
assertThat("expected to verify " + expectedShards + " shard(s)", nbCheckedShards, equalTo(expectedShards));
}
}