diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java index 0aeb3828e389e..69240e848ef44 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/SeqNoPruningIT.java @@ -15,15 +15,20 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; import java.time.Instant; @@ -31,9 +36,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced; import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues; +import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsSeqNoDocValuesCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -45,7 +53,7 @@ public class SeqNoPruningIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(InternalSettingsPlugin.class); + return List.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class); } public void testSeqNoPrunedAfterMerge() throws Exception { @@ -288,6 +296,133 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception { assertShardsHaveSeqNoDocValues(indexName, false, 1); } + /** + * Verifies that seq_no doc values are retained on the primary during an in-progress peer recovery. + * The recovering replica's retention lease prevents the merge policy from pruning seq_no doc values for operations + * that still need to be replayed to the replica. + */ + public void testSeqNoRetainedDuringInProgressRecovery() throws Exception { + assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); + + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(2); + ensureStableCluster(3); + + final var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(1, 1).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true) + .put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0) + .build() + ); + ensureGreen(indexName); + + final int docsPerBatch = randomIntBetween(20, 50); + for (int batch = 0; batch < 2; batch++) { + var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(prepareIndex(indexName).setSource("field", "value-first-" + batch + "-" + doc)); + } + assertNoFailures(bulk.get()); + flush(indexName); + } + refresh(indexName); + + final int firstBatchTotalDocs = 2 * docsPerBatch; + // Wait for leases to advance, then force merge so minRetainedSeqNo advances and first-batch seq_no values are pruned + assertRetentionLeasesAdvanced(client(), indexName, firstBatchTotalDocs); + assertThat(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get().getFailedShards(), equalTo(0)); + + refresh(indexName); + + assertShardsHaveSeqNoDocValues(indexName, false, 2); + + var clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); + var primaryShard = clusterState.routingTable().index(indexName).shard(0).primaryShard(); + String primaryNodeName = clusterState.nodes().get(primaryShard.currentNodeId()).getName(); + String replicaNodeName = clusterState.routingTable() + .index(indexName) + .shard(0) + .assignedShards() + .stream() + .filter(s -> s.primary() == false) + .map(s -> clusterState.nodes().get(s.currentNodeId()).getName()) + .findFirst() + .orElseThrow(); + + final var replicaDataPathSettings = internalCluster().dataPathSettings(replicaNodeName); + internalCluster().stopNode(replicaNodeName); + + // Index a third batch while the replica is down — the replica's retention lease is now stale + var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < docsPerBatch; doc++) { + bulk.add(prepareIndex(indexName).setSource("field", "value-second-" + doc)); + } + assertNoFailures(bulk.get()); + flush(indexName); + + final long totalDocs = firstBatchTotalDocs + docsPerBatch; + assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs); + + CountDownLatch recoveryBlocked = new CountDownLatch(1); + CountDownLatch unblockRecovery = new CountDownLatch(1); + AtomicInteger translogOpsCount = new AtomicInteger(); + MockTransportService primaryTransport = MockTransportService.getInstance(primaryNodeName); + primaryTransport.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.TRANSLOG_OPS.equals(action)) { + if (translogOpsCount.incrementAndGet() > 1) { + recoveryBlocked.countDown(); + safeAwait(unblockRecovery); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + // Use a tiny chunk size so translog ops are sent in many small batches, + // allowing us to block recovery mid-way after some operations have already been replayed + updateClusterSettings(Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey(), ByteSizeValue.ofBytes(100))); + + try { + internalCluster().startDataOnlyNode(replicaDataPathSettings); + safeAwait(recoveryBlocked); + assertThat("at least one translog ops batch should have been sent before blocking", translogOpsCount.get(), greaterThan(1)); + + // Force merge on the primary — the retention lock held by the in-progress recovery prevents + // minRetainedSeqNo from advancing, so the third batch's seq_no doc values are preserved + var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); + assertThat(forceMerge.getFailedShards(), equalTo(0)); + + refresh(indexName); + + // The primary should still have seq_no doc values for the third batch — the recovery's retention lock + // prevents minRetainedSeqNo from advancing past the operations the replica still needs + var primaryIndicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName); + assertShardsSeqNoDocValuesCount(primaryIndicesService, indexName, docsPerBatch, 1); + } finally { + unblockRecovery.countDown(); + primaryTransport.clearAllRules(); + updateClusterSettings(Settings.builder().putNull(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey())); + } + + ensureGreen(indexName); + assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs); + + // Index one more doc and flush to guarantee every shard has at least 2 segments, so force merge actually triggers pruning + indexDoc(indexName, "extra", "field", "value-extra"); + flush(indexName); + + final long finalTotalDocs = totalDocs + 1; + assertRetentionLeasesAdvanced(client(), indexName, finalTotalDocs); + assertThat(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get().getFailedShards(), equalTo(0)); + + refresh(indexName); + + assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), finalTotalDocs); + assertShardsHaveSeqNoDocValues(indexName, false, 2); + } + public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception { assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG); diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java index 52e3d6e06654d..1fc8a5ff46c9f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersTestUtils.java @@ -112,37 +112,62 @@ public static void assertShardsSeqNoDocValuesCount( ) { int checked = 0; for (IndicesService indicesService : cluster.getDataNodeInstances(IndicesService.class)) { - for (var indexService : indicesService) { - if (indexService.index().getName().equals(indexName)) { - for (var indexShard : indexService) { - Long count = indexShard.withEngineOrNull(engine -> { - if (engine == null) { - return null; - } - try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { - long total = 0; - for (var leaf : searcher.getLeafContexts()) { - NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - if (seqNoDV != null) { - while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - total++; - } + checked += doAssertShardsSeqNoDocValuesCount(indicesService, indexName, expectedCount); + } + assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + } + + /** + * Asserts that the total number of {@code _seq_no} doc values for each shard of the given index on a specific node + * equals the expected count. + * + * @param indicesService the {@link IndicesService} instance for the node to check + * @param indexName the index to check + * @param expectedCount the expected total number of doc values per shard + * @param expectedShards the exact number of shards expected to be verified + */ + public static void assertShardsSeqNoDocValuesCount( + IndicesService indicesService, + String indexName, + long expectedCount, + int expectedShards + ) { + int checked = doAssertShardsSeqNoDocValuesCount(indicesService, indexName, expectedCount); + assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + } + + private static int doAssertShardsSeqNoDocValuesCount(IndicesService indicesService, String indexName, long expectedCount) { + int checked = 0; + for (var indexService : indicesService) { + if (indexService.index().getName().equals(indexName)) { + for (var indexShard : indexService) { + Long count = indexShard.withEngineOrNull(engine -> { + if (engine == null) { + return null; + } + try (var searcher = engine.acquireSearcher("assert_seq_no_count")) { + long total = 0; + for (var leaf : searcher.getLeafContexts()) { + NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + if (seqNoDV != null) { + while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + total++; } } - return total; - } catch (IOException e) { - throw new AssertionError(e); } - }); - if (count != null) { - assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); - checked++; + return total; + } catch (IOException e) { + throw new AssertionError(e); } + }); + if (count != null) { + assertThat("retained seq_no doc values count", count, equalTo(expectedCount)); + checked++; } } } } - assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards)); + return checked; } /**