Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -37,14 +39,16 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingIndexCommit;

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, IndexCommit startingIndexCommit) {
this.openMode = openMode;
this.startingIndexCommit = startingIndexCommit;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}
Expand All @@ -62,7 +66,14 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
onCommit(commits);
if (startingIndexCommit == null) {
onCommit(commits);
} else {
assert commits.contains(startingIndexCommit) : "Existing commits must contain the starting commit; " +
"startingCommit [" + startingIndexCommit + "], commits [" + commits + "]";
commits.stream().filter(commit -> startingIndexCommit.equals(commit) == false).forEach(IndexCommit::delete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need special handling here and need the start commit point? can you explain?

Copy link
Member Author

@dnhatn dnhatn Dec 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I calculated the retained translog generations incorrectly; I will revert this change. There is an issue with the local checkpoint; I will reach out to discuss with you @bleskes.

updateTranslogDeletionPolicy(startingIndexCommit, startingIndexCommit);
}
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
Expand All @@ -71,7 +82,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits);
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
Expand All @@ -90,12 +101,38 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

/**
* Selects a starting commit point from a list of existing commits based on the persisted global checkpoint from translog
* and the retained translog generations. All the required translog files of a starting commit point must exist,
* and its max seqno should be at most the global checkpoint from the translog checkpoint.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)}
* @param minRetainedTranslogGen the minimum translog generation is retained, see {@link Translog#readMinReferencedTranslogGen(Path)}
*/
public static IndexCommit startingCommitPoint(List<IndexCommit> commits, long globalCheckpoint, long minRetainedTranslogGen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering - should we call this findSafeCommit? I suspect that this will be use full for later too.

throws IOException {
if (commits.isEmpty()) {
throw new IllegalArgumentException("Commit list must not empty");
}
// Snapshotted commits may not have all its required translog.
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : commits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this? isn't the logic in indexOfKeptCommits enough to deal with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous 6.x versions, we keep the last commit and translog for that commit only. If we take a snapshot and commit, we will have two commits but translog for the last commit only. During the store recovery, if the max_seqno of the last commit is greater than the global checkpoint, the Policy will pick the snapshotted commit although it does not have full translog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think we should keep this class clean and put this prefiltering in the engine, if we open an index created before 6.2. This way it will be clear when we can remove it.

recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "Unable to select a proper starting commit point; " +
"commits [" + commits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
final int keptPosition = indexOfKeptCommits(recoverableCommits, globalCheckpoint);
return recoverableCommits.get(keptPosition);
}

/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
Expand All @@ -110,7 +147,7 @@ private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOExc
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand Down Expand Up @@ -78,6 +79,7 @@ public final class EngineConfig {
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final IndexCommit startingCommitPoint;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -124,7 +126,8 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
IndexCommit startingCommitPoint) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand Down Expand Up @@ -155,6 +158,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.startingCommitPoint = startingCommitPoint;
}

/**
Expand Down Expand Up @@ -380,4 +384,11 @@ public Sort getIndexSort() {
public CircuitBreakerService getCircuitBreakerService() {
return this.circuitBreakerService;
}

/**
* Returns the starting commit point that an engine should open with.
*/
public IndexCommit getStartingCommitPoint() {
return startingCommitPoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint,
engineConfig.getStartingCommitPoint())
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
Expand Down Expand Up @@ -370,9 +371,9 @@ private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOExcep
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
return store.loadSeqNoStats(globalCheckpoint, engineConfig.getStartingCommitPoint());
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO, engineConfig.getStartingCommitPoint());
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -1796,6 +1797,7 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
private IndexWriter createWriter(boolean create) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create);
iwc.setIndexCommit(engineConfig.getStartingCommitPoint());
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
Expand Down Expand Up @@ -140,6 +142,7 @@
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -2166,10 +2169,11 @@ private DocumentMapperForType docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type);
}

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) throws IOException {
Sort indexSort = indexSortSupplier.get();
final boolean forceNewHistoryUUID;
switch (shardRouting.recoverySource().getType()) {
final RecoverySource.Type recoveryType = shardRouting.recoverySource().getType();
switch (recoveryType) {
case EXISTING_STORE:
case PEER:
forceNewHistoryUUID = false;
Expand All @@ -2180,7 +2184,15 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
forceNewHistoryUUID = true;
break;
default:
throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]");
throw new AssertionError("unknown recovery type: [" + recoveryType + "]");
}
final IndexCommit startingCommit;
if (recoveryType == RecoverySource.Type.EXISTING_STORE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this needs to be out of engine? can't we do it in the engine construct as an invariant when opening an index (and translog)

Copy link
Member Author

@dnhatn dnhatn Dec 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I will move it to the Engine.

startingCommit = CombinedDeletionPolicy.startingCommitPoint(DirectoryReader.listCommits(store.directory()),
Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
Translog.readMinReferencedTranslogGen(translogConfig.getTranslogPath()));
} else {
startingCommit = null;
}
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
Expand All @@ -2189,7 +2201,7 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService);
indexSort, this::runTranslogRecovery, circuitBreakerService, startingCommit);
}

/**
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,17 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

/**
* Loads the local checkpoint and the maximum sequence number from the latest Lucene commit point and returns the triplet of local and
* global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided
* Loads the local checkpoint and the maximum sequence number from the provided Lucene commit point and returns the triplet of local
* and global checkpoints, and maximum sequence number as an instance of {@link SeqNoStats}. The global checkpoint must be provided
* externally as it is not stored in the commit point.
*
* @param globalCheckpoint the provided global checkpoint
* @param commit the commit point to load seqno stats, or the last commit in the store if the parameter is null
* @return an instance of {@link SeqNoStats} populated with the local and global checkpoints, and the maximum sequence number
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
public SeqNoStats loadSeqNoStats(final long globalCheckpoint, final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, userData.entrySet());
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1703,6 +1703,17 @@ public static final long readGlobalCheckpoint(final Path location) throws IOExce
return readCheckpoint(location).globalCheckpoint;
}

/**
* Reads the minimum referenced generation translog generation from the translog checkpoint.
*
* @param location the location of the translog
* @return the minimum generation referenced by the translog.
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static long readMinReferencedTranslogGen(final Path location) throws IOException {
return readCheckpoint(location).minTranslogGeneration;
}

/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint, null);
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint();
/*
Expand Down
Loading