Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced;
import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void testSeqNoPrunedAfterMerge() throws Exception {
);

// waits for retention leases to advance past all docs
assertRetentionLeasesAdvanced(indexName, totalDocs);
assertRetentionLeasesAdvanced(client(), indexName, totalDocs);

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));
Expand Down Expand Up @@ -268,7 +269,7 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception {
final long newMaxSeqNo = indicesAdmin().prepareStats(indexName).get().getShards()[0].getSeqNoStats().getMaxSeqNo();

// wait for all retention leases to advance past all docs
assertRetentionLeasesAdvanced(indexName, newMaxSeqNo + 1);
assertRetentionLeasesAdvanced(client(), indexName, newMaxSeqNo + 1);

forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));
Expand All @@ -284,30 +285,6 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception {
assertShardsHaveSeqNoDocValues(indexName, false, 1);
}

/**
* Waits for all retention leases on all copies of the given index to have their retaining sequence number
* equal to the expected value.
*/
private static void assertRetentionLeasesAdvanced(String indexName, long expectedRetainingSeqNo) throws Exception {
assertBusy(() -> {
for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
for (RetentionLease lease : indexShard.getRetentionLeases().leases()) {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(expectedRetainingSeqNo)
);
}
}
}
}
}
});
}

public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

Expand Down Expand Up @@ -363,7 +340,7 @@ public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception {
greaterThan(1L)
);

assertRetentionLeasesAdvanced(indexName, totalDocs);
assertRetentionLeasesAdvanced(client(), indexName, totalDocs);

var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertNoFailures(forceMerge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;

Expand All @@ -30,14 +32,32 @@ private SequenceNumbersTestUtils() {}

/**
* Asserts that all shards of the given index either have or lack {@code _seq_no} doc values on disk.
* Uses the default {@link ESIntegTestCase#internalCluster()}.
*
* @param indexName the index to check
* @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty
* @param expectedShards the exact number of shards expected to be verified
*/
public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expectDocValuesOnDisk, int expectedShards) {
assertShardsHaveSeqNoDocValues(ESIntegTestCase.internalCluster(), indexName, expectDocValuesOnDisk, expectedShards);
}

/**
* Asserts that all shards of the given index on the given cluster either have or lack {@code _seq_no} doc values on disk.
*
* @param cluster the cluster to check
* @param indexName the index to check
* @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty
* @param expectedShards the exact number of shards expected to be verified
*/
public static void assertShardsHaveSeqNoDocValues(
InternalTestCluster cluster,
String indexName,
boolean expectDocValuesOnDisk,
int expectedShards
) {
int nbCheckedShards = 0;
for (var indicesServices : ESIntegTestCase.internalCluster().getDataNodeInstances(IndicesService.class)) {
for (var indicesServices : cluster.getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesServices) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
Expand Down Expand Up @@ -75,4 +95,76 @@ public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expe
}
assertThat("expected to verify " + expectedShards + " shard(s)", nbCheckedShards, equalTo(expectedShards));
}

/**
* Asserts that the total number of {@code _seq_no} doc values across all shards of the given index equals the expected count.
*
* @param cluster the cluster to check
* @param indexName the index to check
* @param expectedCount the expected total number of doc values per shard
* @param expectedShards the exact number of shards expected to be verified
*/
public static void assertShardsSeqNoDocValuesCount(
InternalTestCluster cluster,
String indexName,
long expectedCount,
int expectedShards
) {
int checked = 0;
for (IndicesService indicesService : cluster.getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesService) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
Long count = indexShard.withEngineOrNull(engine -> {
if (engine == null) {
return null;
}
try (var searcher = engine.acquireSearcher("assert_seq_no_count")) {
long total = 0;
for (var leaf : searcher.getLeafContexts()) {
NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
if (seqNoDV != null) {
while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
total++;
}
}
}
return total;
} catch (IOException e) {
throw new AssertionError(e);
}
});
if (count != null) {
assertThat("retained seq_no doc values count", count, equalTo(expectedCount));
checked++;
}
}
}
}
}
assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards));
}

/**
* Waits until all retention leases on all shards of the given index have their retaining sequence number
* equal to the expected value.
*
* @param client the client to use for stats requests
* @param indexName the index to check
* @param expectedRetainingSeqNo the expected retaining sequence number for every lease
*/
public static void assertRetentionLeasesAdvanced(Client client, String indexName, long expectedRetainingSeqNo) throws Exception {
ESIntegTestCase.assertBusy(() -> {
var stats = client.admin().indices().prepareStats(indexName).get();
for (var shardStats : stats.getShards()) {
for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) {
assertThat(
"retention lease [" + lease.id() + "] should have advanced",
lease.retainingSequenceNumber(),
equalTo(expectedRetainingSeqNo)
);
}
}
});
}
}
Loading
Loading