diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java index 00bf70cd11b1b..dfd1af8c67207 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.List; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -84,7 +85,7 @@ public void testSeqNoPrunedAfterMerge() throws Exception { ); // waits for retention leases to advance past all docs - assertRetentionLeasesAdvanced(indexName, totalDocs); + assertRetentionLeasesAdvanced(client(), indexName, totalDocs); var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); assertThat(forceMerge.getFailedShards(), equalTo(0)); @@ -268,7 +269,7 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception { final long newMaxSeqNo = indicesAdmin().prepareStats(indexName).get().getShards()[0].getSeqNoStats().getMaxSeqNo(); // wait for all retention leases to advance past all docs - assertRetentionLeasesAdvanced(indexName, newMaxSeqNo + 1); + assertRetentionLeasesAdvanced(client(), indexName, newMaxSeqNo + 1); forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); assertThat(forceMerge.getFailedShards(), equalTo(0)); @@ -284,30 +285,6 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception { assertShardsHaveSeqNoDocValues(indexName, false, 1); } - /** - * Waits for all retention leases on all copies of the given index to have their retaining sequence number - * equal to the expected value. - */ - private static void assertRetentionLeasesAdvanced(String indexName, long expectedRetainingSeqNo) throws Exception { - assertBusy(() -> { - for (var indicesServices : internalCluster().getDataNodeInstances(IndicesService.class)) { - for (var indexService : indicesServices) { - if (indexService.index().getName().equals(indexName)) { - for (var indexShard : indexService) { - for (RetentionLease lease : indexShard.getRetentionLeases().leases()) { - assertThat( - "retention lease [" + lease.id() + "] should have advanced", - lease.retainingSequenceNumber(), - equalTo(expectedRetainingSeqNo) - ); - } - } - } - } - } - }); - } - public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception { assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); @@ -363,7 +340,7 @@ public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception { greaterThan(1L) ); - assertRetentionLeasesAdvanced(indexName, totalDocs); + assertRetentionLeasesAdvanced(client(), indexName, totalDocs); var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); assertNoFailures(forceMerge); diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java index 3625df19d698b..52e3d6e06654d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java @@ -10,9 +10,11 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.DocIdSetIterator; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import java.io.IOException; @@ -30,14 +32,32 @@ private SequenceNumbersTestUtils() {} /** * Asserts that all shards of the given index either have or lack {@code _seq_no} doc values on disk. + * Uses the default {@link ESIntegTestCase#internalCluster()}. * * @param indexName the index to check * @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty * @param expectedShards the exact number of shards expected to be verified */ public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expectDocValuesOnDisk, int expectedShards) { + assertShardsHaveSeqNoDocValues(ESIntegTestCase.internalCluster(), indexName, expectDocValuesOnDisk, expectedShards); + } + + /** + * Asserts that all shards of the given index on the given cluster either have or lack {@code _seq_no} doc values on disk. + * + * @param cluster the cluster to check + * @param indexName the index to check + * @param expectDocValuesOnDisk {@code true} to assert doc values are present, {@code false} to assert they are empty + * @param expectedShards the exact number of shards expected to be verified + */ + public static void assertShardsHaveSeqNoDocValues( + InternalTestCluster cluster, + String indexName, + boolean expectDocValuesOnDisk, + int expectedShards + ) { int nbCheckedShards = 0; - for (var indicesServices : ESIntegTestCase.internalCluster().getDataNodeInstances(IndicesService.class)) { + for (var indicesServices : cluster.getDataNodeInstances(IndicesService.class)) { for (var indexService : indicesServices) { if (indexService.index().getName().equals(indexName)) { for (var indexShard : indexService) { @@ -75,4 +95,76 @@ public static void assertShardsHaveSeqNoDocValues(String indexName, boolean expe } assertThat("expected to verify " + expectedShards + " shard(s)", nbCheckedShards, equalTo(expectedShards)); } + + /** + * Asserts that the total number of {@code _seq_no} doc values across all shards of the given index equals the expected count. + * + * @param cluster the cluster to check + * @param indexName the index to check + * @param expectedCount the expected total number of doc values per shard + * @param expectedShards the exact number of shards expected to be verified + */ + public static void assertShardsSeqNoDocValuesCount( + InternalTestCluster cluster, + String indexName, + long expectedCount, + int expectedShards + ) { + int checked = 0; + for (IndicesService indicesService : cluster.getDataNodeInstances(IndicesService.class)) { + for (var indexService : indicesService) { + if (indexService.index().getName().equals(indexName)) { + for (var indexShard : indexService) { + Long count = indexShard.withEngineOrNull(engine -> { + if (engine == null) { + return null; + } + try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { + long total = 0; + for (var leaf : searcher.getLeafContexts()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (seqNoDV != null) { + while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + total++; + } + } + } + return total; + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (count != null) { + assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); + checked++; + } + } + } + } + } + assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + } + + /** + * Waits until all retention leases on all shards of the given index have their retaining sequence number + * equal to the expected value. + * + * @param client the client to use for stats requests + * @param indexName the index to check + * @param expectedRetainingSeqNo the expected retaining sequence number for every lease + */ + public static void assertRetentionLeasesAdvanced(Client client, String indexName, long expectedRetainingSeqNo) throws Exception { + ESIntegTestCase.assertBusy(() -> { + var stats = client.admin().indices().prepareStats(indexName).get(); + for (var shardStats : stats.getShards()) { + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + assertThat( + "retention lease [" + lease.id() + "] should have advanced", + lease.retainingSequenceNumber(), + equalTo(expectedRetainingSeqNo) + ); + } + } + }); + } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java new file mode 100644 index 0000000000000..d261e27a404f8 --- /dev/null +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrSeqNoPruningIT.java @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; + +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsSeqNoDocValuesCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class CcrSeqNoPruningIT extends CcrIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(CcrRetentionLeaseIT.RetentionLeaseRenewIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + + @Override + protected Settings followerClusterSettings() { + return Settings.builder() + .put(super.followerClusterSettings()) + .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100)) + .build(); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + public void testSeqNoPrunedOnLeaderAfterFollowerCatchesUp() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + final var leaderIndex = randomIdentifier(); + final var followerIndex = "follower-" + leaderIndex; + final int numberOfShards = 1; + + final var additionalSettings = Map.of( + IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), + "true", + IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), + SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY.toString(), + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(100).getStringRep() + ); + + final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, additionalSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderGreen(leaderIndex); + + final int nbBatches = randomIntBetween(5, 10); + final int docsPerBatch = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch + doc)); + } + assertNoFailures(bulk.get()); + } + + flush(leaderClient(), leaderIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, true, numberOfShards); + assertThat( + leaderClient().admin() + .indices() + .prepareStats(leaderIndex) + .clear() + .setSegments(true) + .get() + .getPrimaries() + .getSegments() + .getCount(), + greaterThan(1L) + ); + + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); + + final long maxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, maxSeqNo + 1); + + var forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + assertThat( + leaderClient().admin() + .indices() + .prepareStats(leaderIndex) + .clear() + .setSegments(true) + .get() + .getPrimaries() + .getSegments() + .getCount(), + equalTo(1L) + ); + + refresh(leaderClient(), leaderIndex); + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, false, numberOfShards); + + ensureFollowerGreen(true, followerIndex); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + } + + public void testSeqNoPartiallyRetainedByCcrLease() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + final var leaderIndex = randomIdentifier(); + final var followerIndex = "follower-" + leaderIndex; + final int numberOfShards = 1; + + final var additionalSettings = Map.of( + IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), + "true", + IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), + SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY.toString(), + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(100).getStringRep() + ); + + final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, additionalSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderGreen(leaderIndex); + + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex)).get(); + ensureFollowerGreen(true, followerIndex); + + final int nbBatches = randomIntBetween(5, 10); + final int docsPerBatch = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + final long leaseSeqNoBeforePause = getMaxSeqNo(leaderClient(), leaderIndex) + 1; + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, leaseSeqNoBeforePause); + pauseFollow(followerIndex); + + final int nbBatches2 = randomIntBetween(5, 10); + final int docsPerBatch2 = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches2; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch2; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch2 + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + final long newMaxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + assertBusy(() -> { + var stats = leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + for (ShardStats shardStats : stats.getShards()) { + var ccrLeases = RetentionLeaseUtils.toMapExcludingPeerRecoveryRetentionLeases( + shardStats.getRetentionLeaseStats().retentionLeases() + ); + assertThat(ccrLeases.values(), hasSize(1)); + RetentionLease ccrLease = ccrLeases.values().iterator().next(); + assertThat(ccrLease.retainingSequenceNumber(), equalTo(leaseSeqNoBeforePause)); + + for (RetentionLease lease : shardStats.getRetentionLeaseStats().retentionLeases().leases()) { + if (lease.id().equals(ccrLease.id()) == false) { + assertThat( + "peer recovery lease [" + lease.id() + "] should have advanced", + lease.retainingSequenceNumber(), + equalTo(newMaxSeqNo + 1) + ); + } + } + } + }); + + var forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + flush(leaderClient(), leaderIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, true, numberOfShards); + + final long expectedRetainedDocs = newMaxSeqNo + 1 - leaseSeqNoBeforePause; + assertShardsSeqNoDocValuesCount(getLeaderCluster(), leaderIndex, expectedRetainedDocs, numberOfShards); + + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet(); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, newMaxSeqNo + 1); + + final int nbBatches3 = randomIntBetween(5, 10); + final int docsPerBatch3 = randomIntBetween(20, 50); + + for (int batch = 0; batch < nbBatches3; batch++) { + var bulk = leaderClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch3; doc++) { + bulk.add(leaderClient().prepareIndex(leaderIndex).setSource("f", batch * docsPerBatch3 + doc)); + } + assertNoFailures(bulk.get()); + } + flush(leaderClient(), leaderIndex); + + final long finalMaxSeqNo = getMaxSeqNo(leaderClient(), leaderIndex); + + assertRetentionLeasesAdvanced(leaderClient(), leaderIndex, finalMaxSeqNo + 1); + + forceMerge = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + refresh(leaderClient(), leaderIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + assertShardsHaveSeqNoDocValues(getLeaderCluster(), leaderIndex, false, numberOfShards); + + final var newFollowerIndex = "new-follower-" + leaderIndex; + followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, newFollowerIndex)).get(); + ensureFollowerGreen(true, newFollowerIndex); + + assertIndexFullyReplicatedToFollower(leaderIndex, newFollowerIndex); + } + + private static long getMaxSeqNo(Client client, String index) { + return client.admin().indices().prepareStats(index).get().getShards()[0].getSeqNoStats().getMaxSeqNo(); + } + +}