2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
2424import org .apache .lucene .document .Field ;
25- import org .apache .lucene .document .LongPoint ;
2625import org .apache .lucene .document .NumericDocValuesField ;
2726import org .apache .lucene .index .DirectoryReader ;
2827import org .apache .lucene .index .IndexCommit ;
3837import org .apache .lucene .index .SoftDeletesRetentionMergePolicy ;
3938import org .apache .lucene .index .Term ;
4039import org .apache .lucene .search .IndexSearcher ;
41- import org .apache .lucene .search .Query ;
4240import org .apache .lucene .search .ReferenceManager ;
4341import org .apache .lucene .search .SearcherFactory ;
4442import org .apache .lucene .search .SearcherManager ;
@@ -157,6 +155,7 @@ public class InternalEngine extends Engine {
157155 private final CounterMetric numDocUpdates = new CounterMetric ();
158156 private final NumericDocValuesField softDeleteField = Lucene .newSoftDeleteField ();
159157 private final boolean softDeleteEnabled ;
158+ private final SoftDeletesPolicy softDeletesPolicy ;
160159 private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener ;
161160
162161 /**
@@ -182,7 +181,6 @@ public InternalEngine(EngineConfig engineConfig) {
182181 maxUnsafeAutoIdTimestamp .set (Long .MAX_VALUE );
183182 }
184183 this .uidField = engineConfig .getIndexSettings ().isSingleType () ? IdFieldMapper .NAME : UidFieldMapper .NAME ;
185- this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
186184 final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy (
187185 engineConfig .getIndexSettings ().getTranslogRetentionSize ().getBytes (),
188186 engineConfig .getIndexSettings ().getTranslogRetentionAge ().getMillis ()
@@ -204,8 +202,10 @@ public InternalEngine(EngineConfig engineConfig) {
204202 assert translog .getGeneration () != null ;
205203 this .translog = translog ;
206204 this .localCheckpointTracker = createLocalCheckpointTracker (localCheckpointTrackerSupplier );
205+ this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
206+ this .softDeletesPolicy = newSoftDeletesPolicy ();
207207 this .combinedDeletionPolicy =
208- new CombinedDeletionPolicy (logger , translogDeletionPolicy , translog ::getLastSyncedGlobalCheckpoint );
208+ new CombinedDeletionPolicy (logger , translogDeletionPolicy , softDeletesPolicy , translog ::getLastSyncedGlobalCheckpoint );
209209 writer = createWriter ();
210210 bootstrapAppendOnlyInfoFromWriter (writer );
211211 historyUUID = loadHistoryUUID (writer );
@@ -262,6 +262,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
262262 return localCheckpointTrackerSupplier .apply (maxSeqNo , localCheckpoint );
263263 }
264264
265+ private SoftDeletesPolicy newSoftDeletesPolicy () throws IOException {
266+ final Map <String , String > commitUserData = store .readLastCommittedSegmentsInfo ().userData ;
267+ final long lastMinRetainedSeqNo ;
268+ if (commitUserData .containsKey (Engine .MIN_RETAINED_SEQNO )) {
269+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (Engine .MIN_RETAINED_SEQNO ));
270+ } else {
271+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (SequenceNumbers .MAX_SEQ_NO )) + 1 ;
272+ }
273+ return new SoftDeletesPolicy (translog ::getLastSyncedGlobalCheckpoint , lastMinRetainedSeqNo ,
274+ engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ());
275+ }
276+
265277 /**
266278 * This reference manager delegates all it's refresh calls to another (internal) SearcherManager
267279 * The main purpose for this is that if we have external refreshes happening we don't issue extra
@@ -481,18 +493,39 @@ public void syncTranslog() throws IOException {
481493 }
482494
483495 @ Override
484- public Closeable acquireTranslogRetentionLock () {
485- return getTranslog ().acquireRetentionLock ( );
496+ public Translog . Snapshot newTranslogSnapshotBetween ( long minSeqNo , long maxSeqNo ) throws IOException {
497+ return getTranslog ().getSnapshotBetween ( minSeqNo , maxSeqNo );
486498 }
487499
500+ /**
501+ * Creates a new history snapshot for reading operations since the provided seqno.
502+ * The returned snapshot can be retrieved from either Lucene index or translog files.
503+ */
488504 @ Override
489- public Translog .Snapshot newTranslogSnapshotBetween (long minSeqNo , long maxSeqNo ) throws IOException {
490- return getTranslog ().getSnapshotBetween (minSeqNo , maxSeqNo );
505+ public Translog .Snapshot readHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
506+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
507+ return newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false );
508+ } else {
509+ return getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE );
510+ }
491511 }
492512
513+ /**
514+ * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
515+ */
493516 @ Override
494- public int estimateTranslogOperationsFromMinSeq (long minSeqNo ) {
495- return getTranslog ().estimateTotalOperationsFromMinSeq (minSeqNo );
517+ public int estimateNumberOfHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
518+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
519+ try (Translog .Snapshot snapshot =
520+ newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false )) {
521+ return snapshot .totalOperations ();
522+ } catch (IOException ex ) {
523+ maybeFailEngine (source , ex );
524+ throw ex ;
525+ }
526+ } else {
527+ return getTranslog ().estimateTotalOperationsFromMinSeq (startingSeqNo );
528+ }
496529 }
497530
498531 @ Override
@@ -2127,8 +2160,8 @@ private IndexWriterConfig getIndexWriterConfig() {
21272160 MergePolicy mergePolicy = config ().getMergePolicy ();
21282161 if (softDeleteEnabled ) {
21292162 iwc .setSoftDeletesField (Lucene .SOFT_DELETE_FIELD );
2130- mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , this :: softDeletesRetentionQuery ,
2131- new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , this :: softDeletesRetentionQuery , mergePolicy ));
2163+ mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , softDeletesPolicy :: getRetentionQuery ,
2164+ new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , softDeletesPolicy :: getRetentionQuery , mergePolicy ));
21322165 }
21332166 iwc .setMergePolicy (new ElasticsearchMergePolicy (mergePolicy ));
21342167 iwc .setSimilarity (engineConfig .getSimilarity ());
@@ -2141,20 +2174,6 @@ private IndexWriterConfig getIndexWriterConfig() {
21412174 return iwc ;
21422175 }
21432176
2144- /**
2145- * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
2146- */
2147- private Query softDeletesRetentionQuery () {
2148- ensureOpen ();
2149- // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
2150- final long retainedExtraOps = engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ();
2151- // Prefer using the global checkpoint which is persisted on disk than an in-memory value.
2152- // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
2153- // then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
2154- final long persistedGlobalCheckpoint = translog .getLastSyncedGlobalCheckpoint ();
2155- return LongPoint .newRangeQuery (SeqNoFieldMapper .NAME , persistedGlobalCheckpoint + 1 - retainedExtraOps , Long .MAX_VALUE );
2156- }
2157-
21582177 /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
21592178 static final class SearchFactory extends EngineSearcherFactory {
21602179 private final Engine .Warmer warmer ;
@@ -2341,6 +2360,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
23412360 commitData .put (SequenceNumbers .MAX_SEQ_NO , Long .toString (localCheckpointTracker .getMaxSeqNo ()));
23422361 commitData .put (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID , Long .toString (maxUnsafeAutoIdTimestamp .get ()));
23432362 commitData .put (HISTORY_UUID_KEY , historyUUID );
2363+ if (softDeleteEnabled ) {
2364+ commitData .put (Engine .MIN_RETAINED_SEQNO , Long .toString (softDeletesPolicy .getMinRetainedSeqNo ()));
2365+ }
23442366 logger .trace ("committing writer with commit data [{}]" , commitData );
23452367 return commitData .entrySet ().iterator ();
23462368 });
@@ -2396,6 +2418,8 @@ public void onSettingsChanged() {
23962418 final IndexSettings indexSettings = engineConfig .getIndexSettings ();
23972419 translogDeletionPolicy .setRetentionAgeInMillis (indexSettings .getTranslogRetentionAge ().getMillis ());
23982420 translogDeletionPolicy .setRetentionSizeInBytes (indexSettings .getTranslogRetentionSize ().getBytes ());
2421+
2422+ softDeletesPolicy .setRetentionOperations (indexSettings .getSoftDeleteRetentionOperations ());
23992423 }
24002424
24012425 public MergeStats getMergeStats () {
@@ -2509,6 +2533,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m
25092533 }
25102534 }
25112535
2536+ @ Override
2537+ public boolean hasCompleteOperationHistory (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
2538+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
2539+ return getMinRetainedSeqNo () <= startingSeqNo ;
2540+ } else {
2541+ final long currentLocalCheckpoint = getLocalCheckpointTracker ().getCheckpoint ();
2542+ final LocalCheckpointTracker tracker = new LocalCheckpointTracker (startingSeqNo , startingSeqNo - 1 );
2543+ try (Translog .Snapshot snapshot = getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE )) {
2544+ Translog .Operation operation ;
2545+ while ((operation = snapshot .next ()) != null ) {
2546+ if (operation .seqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO ) {
2547+ tracker .markSeqNoAsCompleted (operation .seqNo ());
2548+ }
2549+ }
2550+ }
2551+ return tracker .getCheckpoint () >= currentLocalCheckpoint ;
2552+ }
2553+ }
2554+
2555+ /**
2556+ * Returns the minimum seqno that is retained in the Lucene index.
2557+ * Operations whose seq# are at least this value should exist in the Lucene index.
2558+ */
2559+ final long getMinRetainedSeqNo () {
2560+ assert softDeleteEnabled : Thread .currentThread ().getName ();
2561+ return softDeletesPolicy .getMinRetainedSeqNo ();
2562+ }
2563+
2564+ @ Override
2565+ public Closeable acquireRetentionLockForPeerRecovery () {
2566+ final Closeable translogLock = translog .acquireRetentionLock ();
2567+ final Releasable softDeletesLock = softDeletesPolicy .acquireRetentionLock ();
2568+ return () -> IOUtils .close (translogLock , softDeletesLock );
2569+ }
2570+
25122571 @ Override
25132572 public boolean isRecovering () {
25142573 return pendingTranslogRecovery .get ();
0 commit comments