Skip to content

Commit 6e2f7b4

Browse files
authored
Use Lucene index in peer recovery and resync (#51189)
We can use Lucene index exclusively in peer recoveries and primary-replica resync in 8.0. Relates #50775
1 parent 73991da commit 6e2f7b4

File tree

15 files changed

+57
-276
lines changed

15 files changed

+57
-276
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,12 @@ public void testRecovery() throws Exception {
667667
* an index without a translog so we randomize whether
668668
* or not we have one. */
669669
shouldHaveTranslog = randomBoolean();
670-
670+
Settings.Builder settings = Settings.builder();
671+
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
672+
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
673+
}
674+
final String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
675+
createIndex(index, settings.build(), mappings);
671676
indexRandomDocuments(count, true, true, i -> jsonBuilder().startObject().field("field", "value").endObject());
672677

673678
// make sure all recoveries are done
@@ -1267,7 +1272,8 @@ public void testOperationBasedRecovery() throws Exception {
12671272
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
12681273
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
12691274
}
1270-
createIndex(index, settings.build());
1275+
final String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
1276+
createIndex(index, settings.build(), mappings);
12711277
ensureGreen(index);
12721278
int committedDocs = randomIntBetween(100, 200);
12731279
for (int i = 0; i < committedDocs; i++) {
@@ -1325,7 +1331,8 @@ public void testResize() throws Exception {
13251331
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
13261332
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false);
13271333
}
1328-
createIndex(index, settings.build());
1334+
final String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
1335+
createIndex(index, settings.build(), mappings);
13291336
numDocs = randomIntBetween(10, 1000);
13301337
for (int i = 0; i < numDocs; i++) {
13311338
indexDocument(Integer.toString(i));

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,8 @@ public void testOperationBasedRecovery() throws Exception {
645645
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
646646
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
647647
}
648-
createIndex(index, settings.build());
648+
final String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
649+
createIndex(index, settings.build(), mappings);
649650
ensureGreen(index);
650651
indexDocs(index, 0, randomIntBetween(100, 200));
651652
flush(index, randomBoolean());

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ public enum SearcherScope {
730730
/**
731731
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
732732
*/
733-
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);
733+
public abstract Closeable acquireHistoryRetentionLock();
734734

735735
/**
736736
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@@ -739,24 +739,10 @@ public enum SearcherScope {
739739
public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
740740
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;
741741

742-
/**
743-
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
744-
* The returned snapshot can be retrieved from either Lucene index or translog files.
745-
*/
746-
public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
747-
MapperService mapperService, long startingSeqNo) throws IOException;
748-
749-
/**
750-
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
751-
*/
752-
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
753-
MapperService mapperService, long startingSeqNo) throws IOException;
754-
755742
/**
756743
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
757744
*/
758-
public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
759-
MapperService mapperService, long startingSeqNo) throws IOException;
745+
public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);
760746

761747
/**
762748
* Gets the minimum retained sequence number for this engine.
@@ -1932,11 +1918,4 @@ public interface TranslogRecoveryRunner {
19321918
* to advance this marker to at least the given sequence number.
19331919
*/
19341920
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
1935-
1936-
/**
1937-
* Whether we should read history operations from translog or Lucene index
1938-
*/
1939-
public enum HistorySource {
1940-
TRANSLOG, INDEX
1941-
}
19421921
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -526,36 +526,6 @@ public void syncTranslog() throws IOException {
526526
revisitIndexDeletionPolicyOnTranslogSynced();
527527
}
528528

529-
/**
530-
* Creates a new history snapshot for reading operations since the provided seqno.
531-
* The returned snapshot can be retrieved from either Lucene index or translog files.
532-
*/
533-
@Override
534-
public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
535-
MapperService mapperService, long startingSeqNo) throws IOException {
536-
if (historySource == HistorySource.INDEX) {
537-
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
538-
} else {
539-
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
540-
}
541-
}
542-
543-
/**
544-
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
545-
*/
546-
@Override
547-
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
548-
MapperService mapperService, long startingSeqNo) throws IOException {
549-
if (historySource == HistorySource.INDEX) {
550-
try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo),
551-
Long.MAX_VALUE, false)) {
552-
return snapshot.totalOperations();
553-
}
554-
} else {
555-
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
556-
}
557-
}
558-
559529
@Override
560530
public TranslogStats getTranslogStats() {
561531
return getTranslog().stats();
@@ -2597,27 +2567,8 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
25972567
}
25982568

25992569
@Override
2600-
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
2601-
MapperService mapperService, long startingSeqNo) throws IOException {
2602-
if (historySource == HistorySource.INDEX) {
2603-
return getMinRetainedSeqNo() <= startingSeqNo;
2604-
} else {
2605-
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
2606-
// avoid scanning translog if not necessary
2607-
if (startingSeqNo > currentLocalCheckpoint) {
2608-
return true;
2609-
}
2610-
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
2611-
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
2612-
Translog.Operation operation;
2613-
while ((operation = snapshot.next()) != null) {
2614-
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
2615-
tracker.markSeqNoAsProcessed(operation.seqNo());
2616-
}
2617-
}
2618-
}
2619-
return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
2620-
}
2570+
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
2571+
return getMinRetainedSeqNo() <= startingSeqNo;
26212572
}
26222573

26232574
/**
@@ -2629,12 +2580,8 @@ public final long getMinRetainedSeqNo() {
26292580
}
26302581

26312582
@Override
2632-
public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
2633-
if (historySource == HistorySource.INDEX) {
2634-
return softDeletesPolicy.acquireRetentionLock();
2635-
} else {
2636-
return translog.acquireRetentionLock();
2637-
}
2583+
public Closeable acquireHistoryRetentionLock() {
2584+
return softDeletesPolicy.acquireRetentionLock();
26382585
}
26392586

26402587
/**

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void syncTranslog() {
304304
}
305305

306306
@Override
307-
public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
307+
public Closeable acquireHistoryRetentionLock() {
308308
return () -> {};
309309
}
310310

@@ -315,20 +315,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
315315
}
316316

317317
@Override
318-
public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
319-
MapperService mapperService, long startingSeqNo) {
320-
return newEmptySnapshot();
321-
}
322-
323-
@Override
324-
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
325-
MapperService mapperService, long startingSeqNo) {
326-
return 0;
327-
}
328-
329-
@Override
330-
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
331-
MapperService mapperService, long startingSeqNo) {
318+
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
332319
// we can do operation-based recovery if we don't have to replay any operation.
333320
return startingSeqNo > seqNoStats.getMaxSeqNo();
334321
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,31 +1918,16 @@ protected void doRun() {
19181918
/**
19191919
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
19201920
*/
1921-
public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) {
1922-
return getEngine().acquireHistoryRetentionLock(source);
1923-
}
1924-
1925-
/**
1926-
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
1927-
*/
1928-
public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
1929-
return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo);
1930-
}
1931-
1932-
/**
1933-
* Creates a new history snapshot for reading operations since the provided starting seqno (inclusive).
1934-
* The returned snapshot can be retrieved from either Lucene index or translog files.
1935-
*/
1936-
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
1937-
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
1921+
public Closeable acquireHistoryRetentionLock() {
1922+
return getEngine().acquireHistoryRetentionLock();
19381923
}
19391924

19401925
/**
19411926
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
1942-
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
1927+
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
19431928
*/
1944-
public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
1945-
return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo);
1929+
public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
1930+
return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);
19461931
}
19471932

19481933
/**
@@ -2131,7 +2116,7 @@ public RetentionLease addRetentionLease(
21312116
assert assertPrimaryMode();
21322117
verifyNotClosed();
21332118
ensureSoftDeletesEnabled("retention leases");
2134-
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
2119+
try (Closeable ignore = acquireHistoryRetentionLock()) {
21352120
final long actualRetainingSequenceNumber =
21362121
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
21372122
return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener);
@@ -2153,7 +2138,7 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS
21532138
assert assertPrimaryMode();
21542139
verifyNotClosed();
21552140
ensureSoftDeletesEnabled("retention leases");
2156-
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
2141+
try (Closeable ignore = acquireHistoryRetentionLock()) {
21572142
final long actualRetainingSequenceNumber =
21582143
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
21592144
return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source);

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3737
import org.elasticsearch.common.xcontent.XContentBuilder;
3838
import org.elasticsearch.core.internal.io.IOUtils;
39-
import org.elasticsearch.index.engine.Engine;
4039
import org.elasticsearch.index.seqno.SequenceNumbers;
4140
import org.elasticsearch.index.translog.Translog;
4241
import org.elasticsearch.tasks.Task;
@@ -86,14 +85,13 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
8685
Translog.Snapshot snapshot = null;
8786
try {
8887
final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1;
88+
assert startingSeqNo >= 0 : "startingSeqNo must be non-negative; got [" + startingSeqNo + "]";
8989
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
9090
final ShardId shardId = indexShard.shardId();
9191
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
9292
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
9393
// Also fail the resync early if the shard is shutting down
94-
snapshot = indexShard.getHistoryOperations("resync",
95-
indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
96-
startingSeqNo);
94+
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
9795
final Translog.Snapshot originalSnapshot = snapshot;
9896
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
9997
@Override

0 commit comments

Comments
 (0)