-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Use Lucene soft-deletes in peer recovery #30522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 43 commits
b18eb06
4b2e385
176b497
9ae627c
ff2215c
188138d
6d901bf
8a78f65
6fe8847
cc2b3f0
0612a05
a61d00b
f3f1fa2
65b8458
fc3d7d1
bd1b8ac
b1e73aa
f7ea71c
04112c6
e34154a
1531024
86c3eba
b3d0d5f
8320647
6b95e21
3be0e30
ca3f781
65ede0b
a0e58b9
c1e03d1
c5ba76f
33be718
c717bf7
df132e1
1bcd443
f232c8a
591d521
76a035f
78c0d92
c30de4a
5ff18f9
8a37126
88950b3
dbe0472
f9eeb90
ccb6e80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,6 @@ | |
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.apache.lucene.document.Field; | ||
| import org.apache.lucene.document.LongPoint; | ||
| import org.apache.lucene.document.NumericDocValuesField; | ||
| import org.apache.lucene.index.DirectoryReader; | ||
| import org.apache.lucene.index.IndexCommit; | ||
|
|
@@ -38,7 +37,6 @@ | |
| import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; | ||
| import org.apache.lucene.index.Term; | ||
| import org.apache.lucene.search.IndexSearcher; | ||
| import org.apache.lucene.search.Query; | ||
| import org.apache.lucene.search.ReferenceManager; | ||
| import org.apache.lucene.search.SearcherFactory; | ||
| import org.apache.lucene.search.SearcherManager; | ||
|
|
@@ -153,6 +151,7 @@ public class InternalEngine extends Engine { | |
| private final CounterMetric numDocUpdates = new CounterMetric(); | ||
| private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField(); | ||
| private final boolean softDeleteEnabled; | ||
| private final SoftDeletesPolicy softDeletesPolicy; | ||
| private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; | ||
|
|
||
| /** | ||
|
|
@@ -177,7 +176,6 @@ public InternalEngine(EngineConfig engineConfig) { | |
| if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { | ||
| maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); | ||
| } | ||
| this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); | ||
| final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( | ||
| engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), | ||
| engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() | ||
|
|
@@ -199,8 +197,10 @@ public InternalEngine(EngineConfig engineConfig) { | |
| assert translog.getGeneration() != null; | ||
| this.translog = translog; | ||
| this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); | ||
| this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); | ||
| this.softDeletesPolicy = newSoftDeletesPolicy(); | ||
| this.combinedDeletionPolicy = | ||
| new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint); | ||
| new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint); | ||
| writer = createWriter(); | ||
| bootstrapAppendOnlyInfoFromWriter(writer); | ||
| historyUUID = loadHistoryUUID(writer); | ||
|
|
@@ -257,6 +257,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker( | |
| return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); | ||
| } | ||
|
|
||
| private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { | ||
| final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().userData; | ||
| final long lastSeqNoSeenByMergePolicy; | ||
| if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) { | ||
| lastSeqNoSeenByMergePolicy = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO)); | ||
| } else { | ||
| lastSeqNoSeenByMergePolicy = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; | ||
| } | ||
| return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastSeqNoSeenByMergePolicy, | ||
| engineConfig.getIndexSettings().getSoftDeleteRetentionOperations()); | ||
| } | ||
|
|
||
| /** | ||
| * This reference manager delegates all it's refresh calls to another (internal) SearcherManager | ||
| * The main purpose for this is that if we have external refreshes happening we don't issue extra | ||
|
|
@@ -468,18 +480,39 @@ public void syncTranslog() throws IOException { | |
| } | ||
|
|
||
| @Override | ||
| public Closeable acquireTranslogRetentionLock() { | ||
| return getTranslog().acquireRetentionLock(); | ||
| public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove this as a follow up, in favor of readHistoryOperations ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we will remove this method after making primary-replica resync use Lucene. |
||
| return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new history snapshot for reading operations since the provided seqno. | ||
| * The returned snapshot can be retrieved from either Lucene index or translog files. | ||
| */ | ||
| @Override | ||
| public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { | ||
| return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo); | ||
| public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { | ||
| if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { | ||
| return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); | ||
| } else { | ||
| return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. | ||
| */ | ||
| @Override | ||
| public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { | ||
| return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); | ||
| public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { | ||
| if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { | ||
| try (Translog.Snapshot snapshot = | ||
| newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)) { | ||
| return snapshot.totalOperations(); | ||
| } catch (IOException ex) { | ||
| maybeFailEngine(source, ex); | ||
| throw ex; | ||
| } | ||
| } else { | ||
| return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -2070,8 +2103,8 @@ private IndexWriterConfig getIndexWriterConfig() { | |
| MergePolicy mergePolicy = config().getMergePolicy(); | ||
| if (softDeleteEnabled) { | ||
| iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD); | ||
| mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery, | ||
| new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy)); | ||
| mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, | ||
| new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy)); | ||
| } | ||
| iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); | ||
| iwc.setSimilarity(engineConfig.getSimilarity()); | ||
|
|
@@ -2084,20 +2117,6 @@ private IndexWriterConfig getIndexWriterConfig() { | |
| return iwc; | ||
| } | ||
|
|
||
| /** | ||
| * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. | ||
| */ | ||
| private Query softDeletesRetentionQuery() { | ||
| ensureOpen(); | ||
| // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit. | ||
| final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(); | ||
| // Prefer using the global checkpoint which is persisted on disk than an in-memory value. | ||
| // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops, | ||
| // then we may not have all required operations whose seq# greater than the global checkpoint after restarted. | ||
| final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); | ||
| return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE); | ||
| } | ||
|
|
||
| /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */ | ||
| static final class SearchFactory extends EngineSearcherFactory { | ||
| private final Engine.Warmer warmer; | ||
|
|
@@ -2284,6 +2303,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl | |
| commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); | ||
| commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); | ||
| commitData.put(HISTORY_UUID_KEY, historyUUID); | ||
| if (softDeleteEnabled) { | ||
| commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); | ||
| } | ||
| logger.trace("committing writer with commit data [{}]", commitData); | ||
| return commitData.entrySet().iterator(); | ||
| }); | ||
|
|
@@ -2339,6 +2361,8 @@ public void onSettingsChanged() { | |
| final IndexSettings indexSettings = engineConfig.getIndexSettings(); | ||
| translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); | ||
| translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); | ||
|
|
||
| softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations()); | ||
| } | ||
|
|
||
| public MergeStats getMergeStats() { | ||
|
|
@@ -2452,6 +2476,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { | ||
| if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { | ||
| return getMinRetainedSeqNo() <= startingSeqNo; | ||
| } else { | ||
| final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); | ||
| final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); | ||
| try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) { | ||
| Translog.Operation operation; | ||
| while ((operation = snapshot.next()) != null) { | ||
| if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
| tracker.markSeqNoAsCompleted(operation.seqNo()); | ||
| } | ||
| } | ||
| } | ||
| return tracker.getCheckpoint() >= currentLocalCheckpoint; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the minimum seqno that is retained in the Lucene index. | ||
| * Operations whose seq# are at least this value should exist in the Lucene index. | ||
| */ | ||
| final long getMinRetainedSeqNo() { | ||
| assert softDeleteEnabled : Thread.currentThread().getName(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting choice - logging the thread name :) |
||
| return softDeletesPolicy.getMinRetainedSeqNo(); | ||
| } | ||
|
|
||
| @Override | ||
| public Closeable acquireRetentionLockForPeerRecovery() { | ||
| final Closeable translogLock = translog.acquireRetentionLock(); | ||
| final Releasable softDeletesLock = softDeletesPolicy.acquireRetentionLock(); | ||
| return () -> IOUtils.close(translogLock, softDeletesLock); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isRecovering() { | ||
| return pendingTranslogRecovery.get(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to minRetainedSeqNo ?