diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java new file mode 100644 index 0000000000000..68e2865e28452 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; + +import java.io.IOException; +import java.util.List; + +/** + * An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files, + * making sure that all translog files that are needed to recover from the Lucene commit are not deleted. + */ +class CombinedDeletionPolicy extends IndexDeletionPolicy { + + private final TranslogDeletionPolicy translogDeletionPolicy; + private final EngineConfig.OpenMode openMode; + + private final SnapshotDeletionPolicy indexDeletionPolicy; + + CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy, + EngineConfig.OpenMode openMode) { + this.indexDeletionPolicy = indexDeletionPolicy; + this.translogDeletionPolicy = translogDeletionPolicy; + this.openMode = openMode; + } + + @Override + public void onInit(List commits) throws IOException { + indexDeletionPolicy.onInit(commits); + switch (openMode) { + case CREATE_INDEX_AND_TRANSLOG: + assert commits.isEmpty() : "index is being created but we already have commits"; + break; + case OPEN_INDEX_CREATE_TRANSLOG: + assert commits.isEmpty() == false : "index is opened, but we have no commits"; + break; + case OPEN_INDEX_AND_TRANSLOG: + assert commits.isEmpty() == false : "index is opened, but we have no commits"; + setLastCommittedTranslogGeneration(commits); + break; + default: + throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); + } + } + + @Override + public void onCommit(List commits) throws IOException { + indexDeletionPolicy.onCommit(commits); + setLastCommittedTranslogGeneration(commits); + } + + private void setLastCommittedTranslogGeneration(List commits) throws IOException { + // when opening an existing lucene index, we currently always open the last commit. + // we therefore use the translog gen as the one that will be required for recovery + final IndexCommit indexCommit = commits.get(commits.size() - 1); + assert indexCommit.isDeleted() == false : "last commit is deleted"; + long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen); + } + + public SnapshotDeletionPolicy getIndexDeletionPolicy() { + return indexDeletionPolicy; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 40c5c0af5e3f6..0602fb2768377 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -127,7 +128,7 @@ public class InternalEngine extends Engine { private final String uidField; - private final SnapshotDeletionPolicy deletionPolicy; + private final CombinedDeletionPolicy deletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -147,9 +148,11 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } - deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + this.deletionPolicy = new CombinedDeletionPolicy( + new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -188,7 +191,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); indexWriter = writer; - translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint()); + translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -320,29 +323,21 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc } } - private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, LongSupplier globalCheckpointSupplier) throws IOException { + private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException { assert openMode != null; final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - Translog.TranslogGeneration generation = null; + String translogUUID = null; if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - generation = loadTranslogIdFromCommit(writer); + translogUUID = loadTranslogUUIDFromCommit(writer); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! - if (generation == null) { - throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); - } - if (generation.translogUUID == null) { + if (translogUUID == null) { throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier); - if (generation == null || generation.translogUUID == null) { + final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); + if (translogUUID == null) { assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - if (generation == null) { - logger.debug("no translog ID present in the current generation - creating one"); - } else if (generation.translogUUID == null) { - logger.debug("upgraded translog to pre 2.0 format, associating translog with index - writing translog UUID"); - } boolean success = false; try { commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG @@ -368,22 +363,18 @@ public Translog getTranslog() { * translog id into lucene and returns null. */ @Nullable - private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException { + private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException { // commit on a just opened writer will commit even if there are no changes done to it // we rely on that for the commit data translog id key final Map commitUserData = commitDataAsMap(writer); - if (commitUserData.containsKey("translog_id")) { - assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID"; - return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id"))); - } else if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY)) { - if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog UUID"); + if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) { + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); } - final String translogUUID = commitUserData.get(Translog.TRANSLOG_UUID_KEY); - final long translogGen = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - return new Translog.TranslogGeneration(translogUUID, translogGen); + return commitUserData.get(Translog.TRANSLOG_UUID_KEY); + } else { + return null; } - return null; } private SearcherManager createSearcherManager() throws EngineException { @@ -1269,14 +1260,13 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti if (indexWriter.hasUncommittedChanges() || force) { ensureCanFlush(); try { - translog.prepareCommit(); + translog.rollGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); - final long committedGeneration = commitIndexWriter(indexWriter, translog, null); + commitIndexWriter(indexWriter, translog, null); logger.trace("finished commit for flush"); // we need to refresh in order to clear older version values refresh("version_table_flush"); - // after refresh documents can be retrieved from the index so we can now commit the translog - translog.commit(committedGeneration); + translog.trimUnreferencedReaders(); } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); } @@ -1428,9 +1418,8 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine logger.trace("finish flush for snapshot"); } try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); logger.trace("pulling snapshot"); - return new IndexCommitRef(deletionPolicy); + return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy()); } catch (IOException e) { throw new SnapshotFailedEngineException(shardId, e); } @@ -1781,10 +1770,9 @@ protected void doRun() throws Exception { * @param writer the index writer to commit * @param translog the translog * @param syncId the sync flush ID ({@code null} if not committing a synced flush) - * @return the minimum translog generation for the local checkpoint committed with the specified index writer * @throws IOException if an I/O exception occurs committing the specfied writer */ - private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { + private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { ensureCanFlush(); try { final long localCheckpoint = seqNoService().getLocalCheckpoint(); @@ -1817,7 +1805,6 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog }); writer.commit(); - return translogGeneration.translogFileGeneration; } catch (final Exception ex) { try { failEngine("lucene commit failed", ex); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 13abf5537859a..36ac6b61adf78 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -256,8 +256,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); - this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, - bigArrays); + this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 7b6922e786723..1314504e397ec 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -37,8 +37,8 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -54,10 +54,9 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Optional; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -76,19 +75,17 @@ * between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction log that belongs to a * different engine. *

- * Each Translog has only one translog file open at any time referenced by a translog generation ID. This ID is written to a translog.ckp file that is designed - * to fit in a single disk block such that a write of the file is atomic. The checkpoint file is written on each fsync operation of the translog and records the number of operations - * written, the current translogs file generation and it's fsynced offset in bytes. + * Each Translog has only one translog file open for writes at any time referenced by a translog generation ID. This ID is written to a + * translog.ckp file that is designed to fit in a single disk block such that a write of the file is atomic. The checkpoint file + * is written on each fsync operation of the translog and records the number of operations written, the current translog's file generation, + * its fsynced offset in bytes, and other important statistics. *

*

- * When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations. - * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against - * the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next - * generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are - * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case - * the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than - * one translog file present. Such an uncommitted translog file always has a translog-${gen}.ckp associated with it which is an fsynced copy of the it's last translog.ckp such that in - * disaster recovery last fsynced offsets, number of operation etc. are still preserved. + * When the current translog file reaches a certain size ({@link IndexSettings#INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING}, or when + * a clear separation between old and new operations (upon change in primary term), the current file is reopened for read only and a new + * write only file is created. Any non-current, read only translog file always has a translog-${gen}.ckp associated with it + * which is an fsynced copy of its last translog.ckp such that in disaster recovery last fsynced offsets, number of + * operation etc. are still preserved. *

*/ public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { @@ -111,23 +108,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - // this is a concurrent set and is not protected by any of the locks. The main reason - // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed) - private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); private BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; private final Path location; private TranslogWriter current; - private static final long NOT_SET_GENERATION = -1; // -1 is safe as it will not cause a translog deletion. - - private volatile long currentCommittingGeneration = NOT_SET_GENERATION; - private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION; private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; private final String translogUUID; + private final TranslogDeletionPolicy deletionPolicy; /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is @@ -137,20 +128,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * translog file referenced by this generation. The translog creation will fail if this generation can't be opened. * * @param config the configuration of this translog - * @param translogGeneration the translog generation to open + * @param expectedTranslogUUID the translog uuid to open, null for a new translog + * @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely + * deleted * @param globalCheckpointSupplier a supplier for the global checkpoint */ public Translog( - final TranslogConfig config, - final TranslogGeneration translogGeneration, + final TranslogConfig config, final String expectedTranslogUUID, TranslogDeletionPolicy deletionPolicy, final LongSupplier globalCheckpointSupplier) throws IOException { super(config.getShardId(), config.getIndexSettings()); this.config = config; this.globalCheckpointSupplier = globalCheckpointSupplier; - if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case + this.deletionPolicy = deletionPolicy; + if (expectedTranslogUUID == null) { translogUUID = UUIDs.randomBase64UUID(); } else { - translogUUID = translogGeneration.translogUUID; + translogUUID = expectedTranslogUUID; } bigArrays = config.getBigArrays(); ReadWriteLock rwl = new ReentrantReadWriteLock(); @@ -160,7 +153,7 @@ public Translog( Files.createDirectories(this.location); try { - if (translogGeneration != null) { + if (expectedTranslogUUID != null) { final Checkpoint checkpoint = readCheckpoint(location); final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); @@ -172,19 +165,18 @@ public Translog( // // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists // if not we don't even try to clean it up and wait until we fail creating it - assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(expectedTranslogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; if (Files.exists(currentCheckpointFile) // current checkpoint is already copied && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); } - this.readers.addAll(recoverFromFiles(translogGeneration, checkpoint)); + this.readers.addAll(recoverFromFiles(deletionPolicy.getMinTranslogGenerationForRecovery(), checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); } boolean success = false; try { current = createWriter(checkpoint.generation + 1); - this.lastCommittedTranslogFileGeneration = translogGeneration.translogFileGeneration; success = true; } finally { // we have to close all the recovered ones otherwise we leak file handles here @@ -196,15 +188,15 @@ public Translog( } } else { IOUtils.rm(location); - logger.debug("wipe translog location - creating new translog"); + // start from whatever generation lucene points to + final long generation = deletionPolicy.getMinTranslogGenerationForRecovery(); + logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation); Files.createDirectories(location); - final long generation = 1; final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong()); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); current = createWriter(generation); - this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION; } // now that we know which files are there, create a new current one. @@ -217,7 +209,7 @@ public Translog( } /** recover all translog files found on disk */ - private ArrayList recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException { + private ArrayList recoverFromFiles(long translogFileGeneration, Checkpoint checkpoint) throws IOException { boolean success = false; ArrayList foundTranslogs = new ArrayList<>(); final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work @@ -225,16 +217,21 @@ private ArrayList recoverFromFiles(TranslogGeneration translogGe try (ReleasableLock lock = writeLock.acquire()) { logger.debug("open uncommitted translog checkpoint {}", checkpoint); final String checkpointTranslogFile = getFilename(checkpoint.generation); - for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) { + // we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on + // the generation id we found in the lucene commit. This gives for better error messages if the wrong + // translog was found. + foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint)); + for (long i = checkpoint.generation - 1; i >= translogFileGeneration; i--) { Path committedTranslogFile = location.resolve(getFilename(i)); if (Files.exists(committedTranslogFile) == false) { - throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); + throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " + + translogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); } final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); foundTranslogs.add(reader); logger.debug("recovered local translog from checkpoint {}", checkpoint); } - foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint)); + Collections.reverse(foundTranslogs); Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); if (Files.exists(commitCheckpoint)) { Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); @@ -339,14 +336,14 @@ public long currentFileGeneration() { * Returns the number of operations in the transaction files that aren't committed to lucene.. */ public int totalOperations() { - return totalOperations(lastCommittedTranslogFileGeneration); + return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery()); } /** * Returns the size in bytes of the translog files that aren't committed to lucene. */ public long sizeInBytes() { - return sizeInBytes(lastCommittedTranslogFileGeneration); + return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery()); } /** @@ -517,9 +514,13 @@ private Snapshot createSnapshot(long minGeneration) { public Translog.View newView() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - View view = new View(lastCommittedTranslogFileGeneration); - outstandingViews.add(view); - return view; + final long viewGen = deletionPolicy.acquireTranslogGenForView(); + try { + return new View(viewGen); + } catch (Exception e) { + deletionPolicy.releaseTranslogGenView(viewGen); + throw e; + } } } @@ -628,6 +629,11 @@ public TranslogConfig getConfig() { return config; } + // public for testing + public TranslogDeletionPolicy getDeletionPolicy() { + return deletionPolicy; + } + /** * a view into the translog, capturing all translog file at the moment of creation * and updated with any future translog. @@ -679,9 +685,8 @@ void ensureOpen() { @Override public void close() throws IOException { if (closed.getAndSet(true) == false) { - logger.trace("closing view starting at translog [{}]", minTranslogGeneration()); - boolean removed = outstandingViews.remove(this); - assert removed : "View was never set but was supposed to be removed"; + logger.trace("closing view starting at translog [{}]", minGeneration); + deletionPolicy.releaseTranslogGenView(minGeneration); trimUnreferencedReaders(); closeFilesIfNoPendingViews(); } @@ -1429,72 +1434,20 @@ public void rollGeneration() throws IOException { } /** - * Prepares a translog commit by setting the current committing generation and rolling the translog generation. - * - * @throws IOException if an I/O exception occurred while rolling the translog generation + * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum + * required generation */ - public void prepareCommit() throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { - ensureOpen(); - if (currentCommittingGeneration != NOT_SET_GENERATION) { - final String message = - String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration); - throw new IllegalStateException(message); - } - currentCommittingGeneration = current.getGeneration(); - rollGeneration(); - } - } - - /** - * Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation - * will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved. - * - * If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog - * generation to be rolled. - * - * @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations - * @throws IOException if an I/O exception occurred preparing the translog commit - */ - public void commit(final long committedGeneration) throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { - ensureOpen(); - assert assertCommittedGenerationIsInValidRange(committedGeneration); - if (currentCommittingGeneration == NOT_SET_GENERATION) { - prepareCommit(); - } - assert currentCommittingGeneration != NOT_SET_GENERATION; - assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) - : "readers missing committing generation [" + currentCommittingGeneration + "]"; - // set the last committed generation otherwise old files will not be cleaned up - lastCommittedTranslogFileGeneration = committedGeneration; - currentCommittingGeneration = NOT_SET_GENERATION; - trimUnreferencedReaders(); - } - } - - private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) { - assert committedGeneration <= current.generation - : "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]"; - final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE); - assert committedGeneration >= min - : "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]"; - return true; - } - - /** - * Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views - * and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}. - */ - void trimUnreferencedReaders() { + public void trimUnreferencedReaders() { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get()) { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = Math.min( - lastCommittedTranslogFileGeneration, - outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE)); + long minReferencedGen = deletionPolicy.minTranslogGenRequired(); + final long minExistingGen = readers.isEmpty() ? current.getGeneration() : readers.get(0).getGeneration(); + assert minReferencedGen >= minExistingGen : + "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + + minExistingGen + "]"; final List unreferenced = readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { @@ -1510,7 +1463,7 @@ void trimUnreferencedReaders() { void closeFilesIfNoPendingViews() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { - if (closed.get() && outstandingViews.isEmpty()) { + if (closed.get() && deletionPolicy.pendingViewsCount() == 0) { logger.trace("closing files. translog is closed and there are no pending views"); ArrayList toClose = new ArrayList<>(readers); toClose.add(current); @@ -1567,13 +1520,6 @@ private void ensureOpen() { } } - /** - * The number of currently open views - */ - int getNumOpenViews() { - return outstandingViews.size(); - } - ChannelFactory getChannelFactory() { return FileChannel::open; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java new file mode 100644 index 0000000000000..84f61a642cc8e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.util.Counter; + +import java.util.HashMap; +import java.util.Map; + +public class TranslogDeletionPolicy { + + /** Records how many views are held against each + * translog generation */ + private final Map translogRefCounts = new HashMap<>(); + + /** + * the translog generation that is requires to properly recover from the oldest non deleted + * {@link org.apache.lucene.index.IndexCommit}. + */ + private long minTranslogGenerationForRecovery = 1; + + public synchronized void setMinTranslogGenerationForRecovery(long newGen) { + if (newGen < minTranslogGenerationForRecovery) { + throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + + minTranslogGenerationForRecovery+ "]"); + } + minTranslogGenerationForRecovery = newGen; + } + + /** + * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation + * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. + */ + synchronized long acquireTranslogGenForView() { + translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1); + return minTranslogGenerationForRecovery; + } + + /** returns the number of generations that were acquired for views */ + synchronized int pendingViewsCount() { + return translogRefCounts.size(); + } + + /** + * releases a generation that was acquired by {@link #acquireTranslogGenForView()} + */ + synchronized void releaseTranslogGenView(long translogGen) { + Counter current = translogRefCounts.get(translogGen); + if (current == null || current.get() <= 0) { + throw new IllegalArgumentException("translog gen [" + translogGen + "] wasn't acquired"); + } + if (current.addAndGet(-1) == 0) { + translogRefCounts.remove(translogGen); + } + } + + /** + * returns the minimum translog generation that is still required by the system. Any generation below + * the returned value may be safely deleted + */ + synchronized long minTranslogGenRequired() { + long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); + return Math.min(viewRefs, minTranslogGenerationForRecovery); + } + + /** returns the translog generation that will be used as a basis of a future store/peer recovery */ + public synchronized long getMinTranslogGenerationForRecovery() { + return minTranslogGenerationForRecovery; + } +} diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java new file mode 100644 index 0000000000000..d21273a7b0335 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CombinedDeletionPolicyTests extends ESTestCase { + + public void testPassThrough() throws IOException { + SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(), + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + List commitList = new ArrayList<>(); + long count = randomIntBetween(1, 3); + for (int i = 0; i < count; i++) { + commitList.add(mockIndexCommitWithTranslogGen(randomNonNegativeLong())); + } + combinedDeletionPolicy.onInit(commitList); + verify(indexDeletionPolicy, times(1)).onInit(commitList); + combinedDeletionPolicy.onCommit(commitList); + verify(indexDeletionPolicy, times(1)).onCommit(commitList); + } + + public void testSettingMinTranslogGen() throws IOException { + SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); + final TranslogDeletionPolicy translogDeletionPolicy = mock(TranslogDeletionPolicy.class); + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, translogDeletionPolicy, + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + List commitList = new ArrayList<>(); + long count = randomIntBetween(10, 20); + long lastGen = 0; + for (int i = 0; i < count; i++) { + lastGen += randomIntBetween(10, 20000); + commitList.add(mockIndexCommitWithTranslogGen(lastGen)); + } + combinedDeletionPolicy.onInit(commitList); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); + commitList.clear(); + for (int i = 0; i < count; i++) { + lastGen += randomIntBetween(10, 20000); + commitList.add(mockIndexCommitWithTranslogGen(lastGen)); + } + combinedDeletionPolicy.onCommit(commitList); + verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); + } + + IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException { + IndexCommit commit = mock(IndexCommit.class); + when(commit.getUserData()).thenReturn(Collections.singletonMap(Translog.TRANSLOG_GENERATION_KEY, Long.toString(gen))); + return commit; + } +} diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8f97da31a6b39..42eef064a9385 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -126,6 +126,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.test.DummyShardLock; @@ -165,6 +166,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -259,8 +261,8 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), - config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort()); + config.getQueryCachingPolicy(), config.getTranslogConfig(), + config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort()); } @Override @@ -331,13 +333,18 @@ protected Translog createTranslog() throws IOException { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } + protected InternalEngine createEngine(Store store, Path translogPath, + Function sequenceNumbersServiceSupplier) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); + } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, null); @@ -353,7 +360,7 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Supplier sequenceNumbersServiceSupplier) throws IOException { + @Nullable Function sequenceNumbersServiceSupplier) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null); } @@ -363,7 +370,7 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Supplier sequenceNumbersServiceSupplier, + @Nullable Function sequenceNumbersServiceSupplier, @Nullable Sort indexSort) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); @@ -380,7 +387,7 @@ public interface IndexWriterFactory { } public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final Supplier sequenceNumbersServiceSupplier, + @Nullable final Function sequenceNumbersServiceSupplier, final EngineConfig config) { return new InternalEngine(config) { @Override @@ -392,7 +399,7 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx @Override public SequenceNumbersService seqNoService() { - return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService(); + return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService(); } }; } @@ -696,25 +703,18 @@ public void testSegmentsStatsIncludingFileSizes() throws Exception { } public void testCommitStats() throws IOException { - InternalEngine engine = null; - try { - this.engine.close(); - - final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); - final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - - engine = new InternalEngine(copy(this.engine.config(), this.engine.config().getOpenMode())) { - @Override - public SequenceNumbersService seqNoService() { - return new SequenceNumbersService( - this.config().getShardId(), - this.config().getIndexSettings(), + final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); + try ( + Store store = createStore(); + InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( + config.getShardId(), + config.getIndexSettings(), maxSeqNo.get(), localCheckpoint.get(), - globalCheckpoint.get()); - } - }; + globalCheckpoint.get()) + )) { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); @@ -751,8 +751,6 @@ public SequenceNumbersService seqNoService() { assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); assertThat(stats2.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO)); assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo.get())); - } finally { - IOUtils.close(engine); } } @@ -877,26 +875,24 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { final int docs = randomIntBetween(1, 4096); final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList()); Randomness.shuffle(seqNos); - engine.close(); Engine initialEngine = null; + Engine recoveringEngine = null; + Store store = createStore(); + final AtomicInteger counter = new AtomicInteger(); try { - final AtomicInteger counter = new AtomicInteger(); - initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) { - @Override - public SequenceNumbersService seqNoService() { - return new SequenceNumbersService( - engine.shardId, - engine.config().getIndexSettings(), - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - return seqNos.get(counter.getAndIncrement()); - } - }; + initialEngine = createEngine(store, createTempDir(), (config) -> + new SequenceNumbersService( + config.getShardId(), + config.getIndexSettings(), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO) { + @Override + public long generateSeqNo() { + return seqNos.get(counter.getAndIncrement()); + } } - }; + ); for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); @@ -907,12 +903,7 @@ public long generateSeqNo() { initialEngine.flush(); } } - } finally { - IOUtils.close(initialEngine); - } - - Engine recoveringEngine = null; - try { + initialEngine.close(); recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); recoveringEngine.recoverFromTranslog(); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { @@ -920,9 +911,8 @@ public long generateSeqNo() { assertEquals(docs, topDocs.totalHits); } } finally { - IOUtils.close(recoveringEngine); + IOUtils.close(initialEngine, recoveringEngine, store); } - } public void testConcurrentGetAndFlush() throws Exception { @@ -1148,6 +1138,20 @@ public void testSearchResultRelease() throws Exception { searchResult.close(); } + public void testCommitAdvancesMinTranslogForRecovery() throws IOException { + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.flush(); + assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L)); + engine.flush(); + assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L)); + engine.flush(true, true); + assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(3L)); + } + public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { @@ -2479,13 +2483,11 @@ public void testSkipTranslogReplay() throws IOException { } assertVisibleCount(engine, numDocs); engine.close(); - engine = new InternalEngine(engine.config()); - + engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits, equalTo(0)); } - } private Mapping dynamicUpdate() { @@ -2713,15 +2715,15 @@ public void testRecoverFromForeignTranslog() throws IOException { Translog translog = new Translog( new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - null, - () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); EngineConfig config = engine.config(); /* create a TranslogConfig that has been created with a different UUID */ - TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), + BigArrays.NON_RECYCLING_INSTANCE); EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), @@ -2822,6 +2824,8 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + assertEquals(1, engine.getTranslog().currentFileGeneration()); + assertEquals(0L, engine.getTranslog().totalOperations()); } } @@ -3539,7 +3543,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); final List threads = new ArrayList<>(); final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); final InternalEngine finalInitialEngine = initialEngine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); @@ -3767,7 +3771,7 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); final Map threads = new LinkedHashMap<>(); final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); final InternalEngine finalActualEngine = actualEngine; final Translog translog = finalActualEngine.getTranslog(); final long generation = finalActualEngine.getTranslog().currentFileGeneration(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index f99de847238a3..d1d7aebc92efd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -108,7 +108,7 @@ public Directory newDirectory() throws IOException { store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, createTempDir("translog"), indexSettings, - BigArrays.NON_RECYCLING_INSTANCE); + BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener eventListener = new Engine.EventListener() { @Override public void onFailedEngine(String reason, @Nullable Exception e) { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 5f75610e2e8dd..d52adf37d6e56 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -126,7 +126,8 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - translog.commit(translog.currentFileGeneration()); + markCurrentGenAsCommitted(translog); + translog.trimUnreferencedReaders(); assertFileDeleted(translog, translog.currentFileGeneration() - 1); } translog.close(); @@ -136,6 +137,24 @@ protected void afterIfSuccessful() throws Exception { } + protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException { + return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + private void markCurrentGenAsCommitted(Translog translog) { + commit(translog, translog.currentFileGeneration()); + } + + private void rollAndCommit(Translog translog) throws IOException { + translog.rollGeneration(); + commit(translog, translog.currentFileGeneration()); + } + + private void commit(Translog translog, long genToCommit) { + translog.getDeletionPolicy().setMinTranslogGenerationForRecovery(genToCommit); + translog.trimUnreferencedReaders(); + } + @Override @Before public void setUp() throws Exception { @@ -149,7 +168,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { try { - assertEquals("there are still open views", 0, translog.getNumOpenViews()); + assertEquals("there are still open views", 0, translog.getDeletionPolicy().pendingViewsCount()); translog.close(); } finally { super.tearDown(); @@ -158,7 +177,7 @@ public void tearDown() throws Exception { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get()); + return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(final Path path) { @@ -182,7 +201,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { + private void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { list.add(op); translog.add(op); } @@ -282,14 +301,14 @@ public void testSimpleOperations() throws IOException { assertNull(snapshot.next()); long firstId = translog.currentFileGeneration(); - translog.prepareCommit(); + translog.rollGeneration(); assertThat(translog.currentFileGeneration(), Matchers.not(equalTo(firstId))); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); - translog.commit(translog.currentFileGeneration()); + markCurrentGenAsCommitted(translog); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); @@ -346,7 +365,7 @@ public void testStats() throws IOException { } final long expectedSizeInBytes = 266L; - translog.prepareCommit(); + translog.rollGeneration(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); @@ -373,7 +392,7 @@ public void testStats() throws IOException { } } - translog.commit(translog.currentFileGeneration()); + markCurrentGenAsCommitted(translog); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); @@ -441,12 +460,12 @@ public void testSnapshotWithNewTranslog() throws IOException { assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); - translog.prepareCommit(); + translog.rollGeneration(); addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3})); try (Translog.View view = translog.newView()) { Translog.Snapshot snapshot2 = translog.newSnapshot(); - translog.commit(translog.currentFileGeneration()); + markCurrentGenAsCommitted(translog); assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } @@ -517,7 +536,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { threads[i].join(60 * 1000); } - List collect = writtenOperations.stream().collect(Collectors.toList()); + List collect = new ArrayList<>(writtenOperations); Collections.sort(collect); Translog.Snapshot snapshot = translog.newSnapshot(); for (LocationOperation locationOperation : collect) { @@ -581,7 +600,7 @@ public void testTranslogChecksums() throws Exception { corruptionsCaught.incrementAndGet(); } } - expectThrows(TranslogCorruptedException.class, () -> snapshot.next()); + expectThrows(TranslogCorruptedException.class, snapshot::next); assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); } @@ -725,8 +744,8 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep if (id % flushEveryOps == 0) { synchronized (flushMutex) { // we need not do this concurrently as we need to make sure that the generation - // we're committing - translog.currentFileGeneration() - is still present when we're committing - translog.commit(translog.currentFileGeneration()); + // we're committing - is still present when we're committing + rollAndCommit(translog); } } if (id % 7 == 0) { @@ -872,7 +891,7 @@ public void testSyncUpTo() throws IOException { assertTrue("we only synced a previous operation yet", translog.syncNeeded()); } if (rarely()) { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } @@ -892,7 +911,7 @@ public void testSyncUpToStream() throws IOException { ArrayList locations = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { - translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry + rollAndCommit(translog); // do this first so that there is at least one pending tlog entry } final Translog.Location location = translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); @@ -904,7 +923,7 @@ public void testSyncUpToStream() throws IOException { assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced } else if (rarely()) { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } else { @@ -925,7 +944,7 @@ public void testLocationComparison() throws IOException { locations.add( translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); } } Collections.shuffle(locations, random()); @@ -1091,7 +1110,7 @@ public void testBasicRecovery() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1100,14 +1119,15 @@ public void testBasicRecovery() throws IOException { TranslogConfig config = translog.getConfig(); translog.close(); - translog = new Translog(config, translogGeneration,() -> SequenceNumbersService.UNASSIGNED_SEQ_NO); if (translogGeneration == null) { + translog = createTranslog(config, null); assertEquals(0, translog.stats().estimatedNumberOfOperations()); assertEquals(1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); Translog.Snapshot snapshot = translog.newSnapshot(); assertNull(snapshot.next()); } else { + translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); Translog.Snapshot snapshot = translog.newSnapshot(); @@ -1130,7 +1150,7 @@ public void testRecoveryUncommitted() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); - translog.prepareCommit(); + translog.rollGeneration(); assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration); assertNotNull(translogGeneration.translogUUID); } @@ -1141,7 +1161,9 @@ public void testRecoveryUncommitted() throws IOException { // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted // translog here as well. TranslogConfig config = translog.getConfig(); - try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1154,7 +1176,7 @@ public void testRecoveryUncommitted() throws IOException { } } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1180,7 +1202,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); - translog.prepareCommit(); + translog.rollGeneration(); assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration); assertNotNull(translogGeneration.translogUUID); } @@ -1195,7 +1217,9 @@ public void testRecoveryUncommittedFileExists() throws IOException { Checkpoint read = Checkpoint.read(ckp); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); - try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1210,7 +1234,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1235,7 +1259,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); - translog.prepareCommit(); + translog.rollGeneration(); assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration); assertNotNull(translogGeneration.translogUUID); } @@ -1248,7 +1272,9 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { Checkpoint read = Checkpoint.read(ckp); Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + @@ -1256,7 +1282,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); - try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1319,23 +1345,25 @@ public void testOpenForeignTranslog() throws IOException { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); firstUncommitted = op + 1; } } - TranslogConfig config = translog.getConfig(); + final TranslogConfig config = translog.getConfig(); + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); - Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, - translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration); + final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, + translogGeneration.translogUUID.length()); try { - new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { } - this.translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); Translog.Snapshot snapshot = this.translog.newSnapshot(); for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); @@ -1509,7 +1537,7 @@ public void testFailFlush() throws IOException { } try { - translog.commit(translog.currentFileGeneration()); + rollAndCommit(translog); fail("already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -1518,7 +1546,9 @@ public void testFailFlush() throws IOException { assertFalse(translog.isOpen()); translog.close(); // we are closed - try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); @@ -1554,7 +1584,7 @@ public void testTragicEventCanBeAnyException() throws IOException { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null); + Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); @@ -1583,6 +1613,7 @@ public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, TranslogConfig config = getTranslogConfig(tempDir); Translog translog = getFailableTranslog(fail, config); + final String translogUUID = translog.getTranslogUUID(); final int threadCount = randomIntBetween(1, 5); Thread[] threads = new Thread[threadCount]; @@ -1648,7 +1679,7 @@ protected void afterAdd() throws IOException { iterator.remove(); } } - try (Translog tlog = new Translog(config, translog.getGeneration(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1669,7 +1700,7 @@ protected void afterAdd() throws IOException { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null); + return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); } private static class FailSwitch { @@ -1702,8 +1733,10 @@ public void onceFailedFailAlways() { } - private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException { - return new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites, + final boolean throwUnknownException, String translogUUID, + final TranslogDeletionPolicy deletionPolicy) throws IOException { + return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override ChannelFactory getChannelFactory() { final ChannelFactory factory = super.getChannelFactory(); @@ -1713,7 +1746,7 @@ ChannelFactory getChannelFactory() { boolean success = false; try { final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation - ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : paritalWrites, throwUnknownException, channel); + ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel); success = true; return throwingFileChannel; } finally { @@ -1815,12 +1848,11 @@ private static final class UnknownException extends RuntimeException { public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { Path tempDir = createTempDir(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + Translog translog = createTranslog(config, null); translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); - Translog.TranslogGeneration generation = translog.getGeneration(); translog.close(); try { - new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -1835,7 +1867,6 @@ protected TranslogWriter createWriter(long fileGeneration) throws IOException { public void testRecoverWithUnbackedNextGen() throws IOException { translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); @@ -1843,8 +1874,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { Checkpoint read = Checkpoint.read(ckp); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); - try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - assertNotNull(translogGeneration); + try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) { assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); for (int i = 0; i < 1; i++) { @@ -1854,8 +1884,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { } tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } - try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - assertNotNull(translogGeneration); + try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) { assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); for (int i = 0; i < 2; i++) { @@ -1868,7 +1897,6 @@ public void testRecoverWithUnbackedNextGen() throws IOException { public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); @@ -1877,7 +1905,7 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); try { - Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -1888,9 +1916,10 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); @@ -1898,8 +1927,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); - try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - assertNotNull(translogGeneration); + try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertFalse(tlog.syncNeeded()); Translog.Snapshot snapshot = tlog.newSnapshot(); for (int i = 0; i < 1; i++) { @@ -1911,7 +1939,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { } try { - Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -1933,14 +1961,15 @@ public void testWithRandomException() throws IOException { fail.failRandomly(); TranslogConfig config = getTranslogConfig(tempDir); final int numOps = randomIntBetween(100, 200); + long minGenForRecovery = 1; List syncedDocs = new ArrayList<>(); List unsynced = new ArrayList<>(); if (randomBoolean()) { fail.onceFailedFailAlways(); } - Translog.TranslogGeneration generation = null; + String generationUUID = null; try { - final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generation); + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -1956,10 +1985,7 @@ public void testWithRandomException() throws IOException { failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails syncedDocs.addAll(unsynced); unsynced.clear(); - if (randomBoolean()) { - failableTLog.prepareCommit(); - } - failableTLog.commit(translog.currentFileGeneration()); + rollAndCommit(failableTLog); syncedDocs.clear(); } } @@ -1979,7 +2005,8 @@ public void testWithRandomException() throws IOException { syncedDocs.addAll(unsynced); // failed in fsync but got fully written unsynced.clear(); } - generation = failableTLog.getGeneration(); + generationUUID = failableTLog.getTranslogUUID(); + minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery(); IOUtils.closeWhileHandlingException(failableTLog); } } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -1990,7 +2017,9 @@ public void testWithRandomException() throws IOException { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation)); + TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { // failed - that's ok, we didn't even create it } catch (IOException ex) { @@ -1999,7 +2028,9 @@ public void testWithRandomException() throws IOException { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - try (Translog translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { @@ -2057,18 +2088,19 @@ public void testCheckpointOnDiskFull() throws IOException { */ public void testPendingDelete() throws IOException { translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); - translog.prepareCommit(); - Translog.TranslogGeneration generation = translog.getGeneration(); + translog.rollGeneration(); TranslogConfig config = translog.getConfig(); + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); translog.close(); - translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); - translog.prepareCommit(); + translog.rollGeneration(); Translog.View view = translog.newView(); translog.add(new Translog.Index("test", "3", 2, new byte[]{3})); translog.close(); IOUtils.close(view); - translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } public static Translog.Location randomTranslogLocation() { @@ -2140,14 +2172,13 @@ public void testRollGeneration() throws IOException { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); } - translog.commit(generation + rolls); - assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1)); + commit(translog, generation + rolls); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); assertThat(translog.totalOperations(), equalTo(0)); for (int i = 0; i < rolls; i++) { assertFileDeleted(translog, generation + i); } assertFileIsPresent(translog, generation + rolls); - assertFileIsPresent(translog, generation + rolls + 1); } public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { @@ -2172,7 +2203,7 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException } assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore)); - translog.prepareCommit(); + translog.rollGeneration(); assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1)); for (int i = 0; i <= rollsBefore + 1; i++) { @@ -2198,7 +2229,7 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException } } - translog.commit(generation + rollsBefore + 1); + commit(translog, generation + rollsBefore + 1); for (int i = 0; i <= rollsBefore; i++) { assertFileDeleted(translog, generation + i); @@ -2210,7 +2241,6 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException } public void testMinGenerationForSeqNo() throws IOException { - final long initialGeneration = translog.getGeneration().translogFileGeneration; final int operations = randomIntBetween(1, 4096); final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList()); Randomness.shuffle(shuffledSeqNos); @@ -2230,8 +2260,9 @@ public void testMinGenerationForSeqNo() throws IOException { } Map>> generations = new HashMap<>(); - - translog.commit(initialGeneration); + // one extra roll to make sure that all ops so far are available via a reader and a translog-{gen}.ckp + // file in a consistent way, in order to simplify checking code. + translog.rollGeneration(); for (long seqNo = 0; seqNo < operations; seqNo++) { final Set> seenSeqNos = new HashSet<>(); final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration; @@ -2271,7 +2302,7 @@ public void testSimpleCommit() throws IOException { final long generation = randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration())); - translog.commit(generation); + commit(translog, generation); for (long g = 0; g < generation; g++) { assertFileDeleted(translog, g); } @@ -2288,13 +2319,13 @@ public void testPrepareCommitAndCommit() throws IOException { translog.add(new Translog.NoOp(seqNo++, 0, "test")); if (rarely()) { final long generation = translog.currentFileGeneration(); - translog.prepareCommit(); + translog.rollGeneration(); if (rarely()) { // simulate generation filling up and rolling between preparing the commit and committing translog.rollGeneration(); } final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation)); - translog.commit(committedGeneration); + commit(translog, committedGeneration); last = committedGeneration; for (long g = 0; g < committedGeneration; g++) { assertFileDeleted(translog, g); @@ -2315,11 +2346,11 @@ public void testCommitWithOpenView() throws IOException { if (rarely()) { try (Translog.View ignored = translog.newView()) { final long viewGeneration = lastCommittedGeneration; - translog.prepareCommit(); + translog.rollGeneration(); final long committedGeneration = randomIntBetween( Math.max(1, Math.toIntExact(lastCommittedGeneration)), Math.toIntExact(translog.currentFileGeneration())); - translog.commit(committedGeneration); + commit(translog, committedGeneration); lastCommittedGeneration = committedGeneration; // with an open view, committing should preserve generations back to the last committed generation for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) { @@ -2334,5 +2365,4 @@ public void testCommitWithOpenView() throws IOException { } } } - }