7575import org .elasticsearch .index .translog .Translog ;
7676import org .elasticsearch .index .translog .TranslogConfig ;
7777import org .elasticsearch .index .translog .TranslogCorruptedException ;
78+ import org .elasticsearch .index .translog .TranslogDeletionPolicy ;
7879import org .elasticsearch .threadpool .ThreadPool ;
7980
8081import java .io .IOException ;
@@ -127,7 +128,7 @@ public class InternalEngine extends Engine {
127128
128129 private final String uidField ;
129130
130- private final SnapshotDeletionPolicy deletionPolicy ;
131+ private final CombinedDeletionPolicy deletionPolicy ;
131132
132133 // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
133134 // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -147,9 +148,11 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
147148 if (engineConfig .isAutoGeneratedIDsOptimizationEnabled () == false ) {
148149 maxUnsafeAutoIdTimestamp .set (Long .MAX_VALUE );
149150 }
150- deletionPolicy = new SnapshotDeletionPolicy (new KeepOnlyLastCommitDeletionPolicy ());
151151 this .uidField = engineConfig .getIndexSettings ().isSingleType () ? IdFieldMapper .NAME : UidFieldMapper .NAME ;
152152 this .versionMap = new LiveVersionMap ();
153+ final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy ();
154+ this .deletionPolicy = new CombinedDeletionPolicy (
155+ new SnapshotDeletionPolicy (new KeepOnlyLastCommitDeletionPolicy ()), translogDeletionPolicy , openMode );
153156 store .incRef ();
154157 IndexWriter writer = null ;
155158 Translog translog = null ;
@@ -188,7 +191,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
188191 seqNoService = sequenceNumberService (shardId , engineConfig .getIndexSettings (), seqNoStats );
189192 updateMaxUnsafeAutoIdTimestampFromWriter (writer );
190193 indexWriter = writer ;
191- translog = openTranslog (engineConfig , writer , () -> seqNoService ().getGlobalCheckpoint ());
194+ translog = openTranslog (engineConfig , writer , translogDeletionPolicy , () -> seqNoService ().getGlobalCheckpoint ());
192195 assert translog .getGeneration () != null ;
193196 } catch (IOException | TranslogCorruptedException e ) {
194197 throw new EngineCreationFailureException (shardId , "failed to create engine" , e );
@@ -320,29 +323,21 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
320323 }
321324 }
322325
323- private Translog openTranslog (EngineConfig engineConfig , IndexWriter writer , LongSupplier globalCheckpointSupplier ) throws IOException {
326+ private Translog openTranslog (EngineConfig engineConfig , IndexWriter writer , TranslogDeletionPolicy translogDeletionPolicy , LongSupplier globalCheckpointSupplier ) throws IOException {
324327 assert openMode != null ;
325328 final TranslogConfig translogConfig = engineConfig .getTranslogConfig ();
326- Translog . TranslogGeneration generation = null ;
329+ String translogUUID = null ;
327330 if (openMode == EngineConfig .OpenMode .OPEN_INDEX_AND_TRANSLOG ) {
328- generation = loadTranslogIdFromCommit (writer );
331+ translogUUID = loadTranslogUUIDFromCommit (writer );
329332 // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
330- if (generation == null ) {
331- throw new IllegalStateException ("no translog generation present in commit data but translog is expected to exist" );
332- }
333- if (generation .translogUUID == null ) {
333+ if (translogUUID == null ) {
334334 throw new IndexFormatTooOldException ("translog" , "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first" );
335335 }
336336 }
337- final Translog translog = new Translog (translogConfig , generation , globalCheckpointSupplier );
338- if (generation == null || generation . translogUUID == null ) {
337+ final Translog translog = new Translog (translogConfig , translogUUID , translogDeletionPolicy , globalCheckpointSupplier );
338+ if (translogUUID == null ) {
339339 assert openMode != EngineConfig .OpenMode .OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
340340 + EngineConfig .OpenMode .OPEN_INDEX_AND_TRANSLOG ;
341- if (generation == null ) {
342- logger .debug ("no translog ID present in the current generation - creating one" );
343- } else if (generation .translogUUID == null ) {
344- logger .debug ("upgraded translog to pre 2.0 format, associating translog with index - writing translog UUID" );
345- }
346341 boolean success = false ;
347342 try {
348343 commitIndexWriter (writer , translog , openMode == EngineConfig .OpenMode .OPEN_INDEX_CREATE_TRANSLOG
@@ -368,22 +363,18 @@ public Translog getTranslog() {
368363 * translog id into lucene and returns null.
369364 */
370365 @ Nullable
371- private Translog . TranslogGeneration loadTranslogIdFromCommit (IndexWriter writer ) throws IOException {
366+ private String loadTranslogUUIDFromCommit (IndexWriter writer ) throws IOException {
372367 // commit on a just opened writer will commit even if there are no changes done to it
373368 // we rely on that for the commit data translog id key
374369 final Map <String , String > commitUserData = commitDataAsMap (writer );
375- if (commitUserData .containsKey ("translog_id" )) {
376- assert commitUserData .containsKey (Translog .TRANSLOG_UUID_KEY ) == false : "legacy commit contains translog UUID" ;
377- return new Translog .TranslogGeneration (null , Long .parseLong (commitUserData .get ("translog_id" )));
378- } else if (commitUserData .containsKey (Translog .TRANSLOG_GENERATION_KEY )) {
379- if (commitUserData .containsKey (Translog .TRANSLOG_UUID_KEY ) == false ) {
380- throw new IllegalStateException ("commit doesn't contain translog UUID" );
370+ if (commitUserData .containsKey (Translog .TRANSLOG_UUID_KEY )) {
371+ if (commitUserData .containsKey (Translog .TRANSLOG_GENERATION_KEY ) == false ) {
372+ throw new IllegalStateException ("commit doesn't contain translog generation id" );
381373 }
382- final String translogUUID = commitUserData .get (Translog .TRANSLOG_UUID_KEY );
383- final long translogGen = Long . parseLong ( commitUserData . get ( Translog . TRANSLOG_GENERATION_KEY ));
384- return new Translog . TranslogGeneration ( translogUUID , translogGen ) ;
374+ return commitUserData .get (Translog .TRANSLOG_UUID_KEY );
375+ } else {
376+ return null ;
385377 }
386- return null ;
387378 }
388379
389380 private SearcherManager createSearcherManager () throws EngineException {
@@ -1269,14 +1260,13 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
12691260 if (indexWriter .hasUncommittedChanges () || force ) {
12701261 ensureCanFlush ();
12711262 try {
1272- translog .prepareCommit ();
1263+ translog .rollGeneration ();
12731264 logger .trace ("starting commit for flush; commitTranslog=true" );
1274- final long committedGeneration = commitIndexWriter (indexWriter , translog , null );
1265+ commitIndexWriter (indexWriter , translog , null );
12751266 logger .trace ("finished commit for flush" );
12761267 // we need to refresh in order to clear older version values
12771268 refresh ("version_table_flush" );
1278- // after refresh documents can be retrieved from the index so we can now commit the translog
1279- translog .commit (committedGeneration );
1269+ translog .trimUnreferencedReaders ();
12801270 } catch (Exception e ) {
12811271 throw new FlushFailedEngineException (shardId , e );
12821272 }
@@ -1428,9 +1418,8 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine
14281418 logger .trace ("finish flush for snapshot" );
14291419 }
14301420 try (ReleasableLock lock = readLock .acquire ()) {
1431- ensureOpen ();
14321421 logger .trace ("pulling snapshot" );
1433- return new IndexCommitRef (deletionPolicy );
1422+ return new IndexCommitRef (deletionPolicy . getIndexDeletionPolicy () );
14341423 } catch (IOException e ) {
14351424 throw new SnapshotFailedEngineException (shardId , e );
14361425 }
@@ -1781,10 +1770,9 @@ protected void doRun() throws Exception {
17811770 * @param writer the index writer to commit
17821771 * @param translog the translog
17831772 * @param syncId the sync flush ID ({@code null} if not committing a synced flush)
1784- * @return the minimum translog generation for the local checkpoint committed with the specified index writer
17851773 * @throws IOException if an I/O exception occurs committing the specfied writer
17861774 */
1787- private long commitIndexWriter (final IndexWriter writer , final Translog translog , @ Nullable final String syncId ) throws IOException {
1775+ private void commitIndexWriter (final IndexWriter writer , final Translog translog , @ Nullable final String syncId ) throws IOException {
17881776 ensureCanFlush ();
17891777 try {
17901778 final long localCheckpoint = seqNoService ().getLocalCheckpoint ();
@@ -1817,7 +1805,6 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
18171805 });
18181806
18191807 writer .commit ();
1820- return translogGeneration .translogFileGeneration ;
18211808 } catch (final Exception ex ) {
18221809 try {
18231810 failEngine ("lucene commit failed" , ex );
0 commit comments