Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,33 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.transport.MockTransportService;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertRetentionLeasesAdvanced;
import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsHaveSeqNoDocValues;
import static org.elasticsearch.index.seqno.SequenceNumbersTestUtils.assertShardsSeqNoDocValuesCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
Expand All @@ -45,7 +53,7 @@ public class SeqNoPruningIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(InternalSettingsPlugin.class);
return List.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class);
}

public void testSeqNoPrunedAfterMerge() throws Exception {
Expand Down Expand Up @@ -288,6 +296,133 @@ public void testSeqNoPartiallyPrunedWithRetentionLease() throws Exception {
assertShardsHaveSeqNoDocValues(indexName, false, 1);
}

/**
* Verifies that seq_no doc values are retained on the primary during an in-progress peer recovery.
* The recovering replica's retention lease prevents the merge policy from pruning seq_no doc values for operations
* that still need to be replayed to the replica.
*/
public void testSeqNoRetainedDuringInProgressRecovery() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final var indexName = randomIdentifier();
createIndex(
indexName,
indexSettings(1, 1).put(IndexSettings.DISABLE_SEQUENCE_NUMBERS.getKey(), true)
.put(IndexSettings.SEQ_NO_INDEX_OPTIONS_SETTING.getKey(), SeqNoFieldMapper.SeqNoIndexOptions.DOC_VALUES_ONLY)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0)
.build()
);
ensureGreen(indexName);

final int docsPerBatch = randomIntBetween(20, 50);
for (int batch = 0; batch < 2; batch++) {
var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
bulk.add(prepareIndex(indexName).setSource("field", "value-first-" + batch + "-" + doc));
}
assertNoFailures(bulk.get());
flush(indexName);
}
refresh(indexName);

final int firstBatchTotalDocs = 2 * docsPerBatch;
// Wait for leases to advance, then force merge so minRetainedSeqNo advances and first-batch seq_no values are pruned
assertRetentionLeasesAdvanced(client(), indexName, firstBatchTotalDocs);
assertThat(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get().getFailedShards(), equalTo(0));

refresh(indexName);

assertShardsHaveSeqNoDocValues(indexName, false, 2);

var clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
var primaryShard = clusterState.routingTable().index(indexName).shard(0).primaryShard();
String primaryNodeName = clusterState.nodes().get(primaryShard.currentNodeId()).getName();
String replicaNodeName = clusterState.routingTable()
.index(indexName)
.shard(0)
.assignedShards()
.stream()
.filter(s -> s.primary() == false)
.map(s -> clusterState.nodes().get(s.currentNodeId()).getName())
.findFirst()
.orElseThrow();

final var replicaDataPathSettings = internalCluster().dataPathSettings(replicaNodeName);
internalCluster().stopNode(replicaNodeName);

// Index a third batch while the replica is down — the replica's retention lease is now stale
var bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int doc = 0; doc < docsPerBatch; doc++) {
bulk.add(prepareIndex(indexName).setSource("field", "value-second-" + doc));
}
assertNoFailures(bulk.get());
flush(indexName);

final long totalDocs = firstBatchTotalDocs + docsPerBatch;
assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);

CountDownLatch recoveryBlocked = new CountDownLatch(1);
CountDownLatch unblockRecovery = new CountDownLatch(1);
AtomicInteger translogOpsCount = new AtomicInteger();
MockTransportService primaryTransport = MockTransportService.getInstance(primaryNodeName);
primaryTransport.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.TRANSLOG_OPS.equals(action)) {
if (translogOpsCount.incrementAndGet() > 1) {
recoveryBlocked.countDown();
safeAwait(unblockRecovery);
}
}
connection.sendRequest(requestId, action, request, options);
});

// Use a tiny chunk size so translog ops are sent in many small batches,
// allowing us to block recovery mid-way after some operations have already been replayed
updateClusterSettings(Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey(), ByteSizeValue.ofBytes(100)));

try {
internalCluster().startDataOnlyNode(replicaDataPathSettings);
safeAwait(recoveryBlocked);
assertThat("at least one translog ops batch should have been sent before blocking", translogOpsCount.get(), greaterThan(1));

// Force merge on the primary — the retention lock held by the in-progress recovery prevents
// minRetainedSeqNo from advancing, so the third batch's seq_no doc values are preserved
var forceMerge = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
assertThat(forceMerge.getFailedShards(), equalTo(0));

refresh(indexName);

// The primary should still have seq_no doc values for the third batch — the recovery's retention lock
// prevents minRetainedSeqNo from advancing past the operations the replica still needs
var primaryIndicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName);
assertShardsSeqNoDocValuesCount(primaryIndicesService, indexName, docsPerBatch, 1);
} finally {
unblockRecovery.countDown();
primaryTransport.clearAllRules();
updateClusterSettings(Settings.builder().putNull(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey()));
}

ensureGreen(indexName);
assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), totalDocs);

// Index one more doc and flush to guarantee every shard has at least 2 segments, so force merge actually triggers pruning
indexDoc(indexName, "extra", "field", "value-extra");
flush(indexName);

final long finalTotalDocs = totalDocs + 1;
assertRetentionLeasesAdvanced(client(), indexName, finalTotalDocs);
assertThat(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get().getFailedShards(), equalTo(0));

refresh(indexName);

assertHitCount(prepareSearch(indexName).setSize(0).setTrackTotalHits(true), finalTotalDocs);
assertShardsHaveSeqNoDocValues(indexName, false, 2);
}

public void testSeqNoPrunedAfterMergeWithTsdbCodec() throws Exception {
assumeTrue("requires disable_sequence_numbers feature flag", IndexSettings.DISABLE_SEQUENCE_NUMBERS_FEATURE_FLAG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,62 @@ public static void assertShardsSeqNoDocValuesCount(
) {
int checked = 0;
for (IndicesService indicesService : cluster.getDataNodeInstances(IndicesService.class)) {
for (var indexService : indicesService) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
Long count = indexShard.withEngineOrNull(engine -> {
if (engine == null) {
return null;
}
try (var searcher = engine.acquireSearcher("assert_seq_no_count")) {
long total = 0;
for (var leaf : searcher.getLeafContexts()) {
NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
if (seqNoDV != null) {
while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
total++;
}
checked += doAssertShardsSeqNoDocValuesCount(indicesService, indexName, expectedCount);
}
assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards));
}

/**
* Asserts that the total number of {@code _seq_no} doc values for each shard of the given index on a specific node
* equals the expected count.
*
* @param indicesService the {@link IndicesService} instance for the node to check
* @param indexName the index to check
* @param expectedCount the expected total number of doc values per shard
* @param expectedShards the exact number of shards expected to be verified
*/
public static void assertShardsSeqNoDocValuesCount(
IndicesService indicesService,
String indexName,
long expectedCount,
int expectedShards
) {
int checked = doAssertShardsSeqNoDocValuesCount(indicesService, indexName, expectedCount);
assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards));
}

private static int doAssertShardsSeqNoDocValuesCount(IndicesService indicesService, String indexName, long expectedCount) {
int checked = 0;
for (var indexService : indicesService) {
if (indexService.index().getName().equals(indexName)) {
for (var indexShard : indexService) {
Long count = indexShard.withEngineOrNull(engine -> {
if (engine == null) {
return null;
}
try (var searcher = engine.acquireSearcher("assert_seq_no_count")) {
long total = 0;
for (var leaf : searcher.getLeafContexts()) {
NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
if (seqNoDV != null) {
while (seqNoDV.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
total++;
}
}
return total;
} catch (IOException e) {
throw new AssertionError(e);
}
});
if (count != null) {
assertThat("retained seq_no doc values count", count, equalTo(expectedCount));
checked++;
return total;
} catch (IOException e) {
throw new AssertionError(e);
}
});
if (count != null) {
assertThat("retained seq_no doc values count", count, equalTo(expectedCount));
checked++;
}
}
}
}
assertThat("expected to verify " + expectedShards + " shard(s)", checked, equalTo(expectedShards));
return checked;
}

/**
Expand Down
Loading