Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -37,7 +39,7 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -71,7 +73,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits);
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
Expand All @@ -90,12 +92,38 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

/**
* Selects a starting commit point from a list of existing commits based on the persisted global checkpoint from translog
* and the retained translog generations. All the required translog files of a starting commit point must exist,
* and its max seqno should be at most the global checkpoint from the translog checkpoint.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)}
* @param minRetainedTranslogGen the minimum translog generation is retained, see {@link Translog#readMinReferencedTranslogGen(Path)}
*/
public static IndexCommit startingCommitPoint(List<IndexCommit> commits, long globalCheckpoint, long minRetainedTranslogGen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering - should we call this findSafeCommit? I suspect that this will be use full for later too.

throws IOException {
if (commits.isEmpty()) {
throw new IllegalArgumentException("Commit list must not empty");
}
// Snapshotted commits may not have all its required translog.
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : commits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this? isn't the logic in indexOfKeptCommits enough to deal with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous 6.x versions, we keep the last commit and translog for that commit only. If we take a snapshot and commit, we will have two commits but translog for the last commit only. During the store recovery, if the max_seqno of the last commit is greater than the global checkpoint, the Policy will pick the snapshotted commit although it does not have full translog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think we should keep this class clean and put this prefiltering in the engine, if we open an index created before 6.2. This way it will be clear when we can remove it.

recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "Unable to select a proper starting commit point; " +
"commits [" + commits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
final int keptPosition = indexOfKeptCommits(recoverableCommits, globalCheckpoint);
return recoverableCommits.get(keptPosition);
}

/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
Expand All @@ -110,7 +138,7 @@ private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOExc
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -139,6 +141,7 @@ public class InternalEngine extends Engine {
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
private final IndexCommit startingCommit;
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
Expand Down Expand Up @@ -179,6 +182,9 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
this.startingCommit = getStartingCommitPoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to make it a field? can't it be a parameter to createWriter ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in two other places: #recoverFromTranslogInternal and #createLocalCheckpointTracker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a parameter to createLocalCheckpointTracker and I left a comment on recoverFromTranslogInternal . The less state the better ;)

assert startingCommit == null || openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this boolean does what you want? if start commit is null it will always pass.

"OPEN_INDEX_AND_TRANSLOG must have starting commit; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -237,23 +243,24 @@ public InternalEngine(EngineConfig engineConfig) {

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
break;
case OPEN_INDEX_AND_TRANSLOG:
return localCheckpointTrackerSupplier.apply(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED);
case OPEN_INDEX_CREATE_TRANSLOG:
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo();
maxSeqNo = seqNoStats.v1();
localCheckpoint = seqNoStats.v2();
final Tuple<Long, Long> seqNoInfo = store.loadSeqNoInfo(null);
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", seqNoInfo.v1(), seqNoInfo.v2());
return localCheckpointTrackerSupplier.apply(seqNoInfo.v1(), seqNoInfo.v2());
case OPEN_INDEX_AND_TRANSLOG:
// When recovering from a previous commit point, we use the local checkpoint from that commit,
// but the max_seqno from the last commit. This allows use to throw away stale operations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this distinction? can't we keep it conceptually simple? also we recover everything from the translog, so I'm not sure how it can happen that we throw away stuff?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If we use the local checkpoint from the last commit, operations having seqno less than or equal to the local checkpoint will be skipped -> we need to use the local checkpoint from the starting commit.

  • We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.

We do so after we replay the translog. at which point we'll know what the max seq is (and it may be higher than the one in the last commit, but it can't be lower)

assert startingCommit != null;
final long localCheckpoint = store.loadSeqNoInfo(startingCommit).v2();
final long maxSeqNo = store.loadSeqNoInfo(null).v1();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break;
default: throw new IllegalArgumentException("unknown type: " + openMode);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
default:
throw new IllegalArgumentException("unknown type: " + openMode);
}
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the final approach better this gives a uniform return value (easier to understand) and it makes sure these things are set.

}

/**
Expand Down Expand Up @@ -381,6 +388,9 @@ public InternalEngine recoverFromTranslog() throws IOException {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (startingCommit == null) {
throw new IllegalStateException("Can't recover from translog without starting commit ");
}
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
Expand All @@ -401,10 +411,21 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
return CombinedDeletionPolicy.startingCommitPoint(existingCommits,
Translog.readGlobalCheckpoint(translogPath),
Translog.readMinReferencedTranslogGen(translogPath));
}
return null;
}

private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final long translogGen = Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lastCommittedSegmentInfo should be set to the opening commit, no?

try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
Expand Down Expand Up @@ -1785,6 +1806,8 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
private IndexWriter createWriter(boolean create) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create);
assert startingCommit == null || create == false : "Starting commit makes sense only when create=false";
iwc.setIndexCommit(startingCommit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of getIndexWriterConfig?

return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,14 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

/**
* Loads the maximum sequence number and local checkpoint from the latest Lucene commit point.
* Loads the maximum sequence number and local checkpoint from the given Lucene commit point or the latest if not provided.
*
* @param commit the commit point to load seqno stats, or the last commit in the store if the parameter is null
* @return a tuple populated with the maximum sequence number and the local checkpoint
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public Tuple<Long, Long> loadSeqNoInfo() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
public Tuple<Long, Long> loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,17 @@ public static final long readGlobalCheckpoint(final Path location) throws IOExce
return readCheckpoint(location).globalCheckpoint;
}

/**
* Reads the minimum referenced generation translog generation from the translog checkpoint.
*
* @param location the location of the translog
* @return the minimum generation referenced by the translog.
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static long readMinReferencedTranslogGen(final Path location) throws IOException {
return readCheckpoint(location).minTranslogGeneration;
}

/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final Tuple<Long, Long> seqNoStats = recoveryTarget.store().loadSeqNoInfo();
final Tuple<Long, Long> seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
long maxSeqNo = seqNoStats.v1();
long localCheckpoint = seqNoStats.v2();
if (maxSeqNo <= globalCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ public void testDeleteInvalidCommits() throws Exception {
}
}

public void testExcludeCommitWithoutTranslog() throws Exception {
final long globalCheckpoint = between(1, 1000);
final UUID translogUUID = UUID.randomUUID();
final long retainedTranslogGen = between(2, 1000);
List<IndexCommit> commitList = new ArrayList<>();
// Commit without full translog.
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomLongBetween(0, retainedTranslogGen - 1)));
long lastMaxSeqNo = randomLongBetween(0, globalCheckpoint * 2);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, randomLongBetween(retainedTranslogGen, Long.MAX_VALUE)));
lastMaxSeqNo += between(1, 1000);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, randomLongBetween(retainedTranslogGen, Long.MAX_VALUE)));

IndexCommit startingCommit = CombinedDeletionPolicy.startingCommitPoint(commitList, globalCheckpoint, retainedTranslogGen);
if (lastMaxSeqNo <= globalCheckpoint){
assertThat(startingCommit, equalTo(commitList.get(2)));
}else{
assertThat(startingCommit, equalTo(commitList.get(1)));
}
}

IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
Expand Down
Loading