Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectIntCursor;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -42,12 +44,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safe commit point is a constantly moving target (as the global checkpoint keeps going up and new commits are being added). I wonder if it's nicer to calculate the safe commit point when it's accessed in acquireIndexCommit, based on the then current globalcheckpoint and the current list of commits (This will require storing the last seen indexCommits, but that would be equivalent to what's being done in Lucene's SnapshotDeletionPolicy).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch Nhat has a follow up to trim unneeded commits as soon as the global checkpoint advances enough. This will have a side effect you mention (it will update things) with some added value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but that involves some extra machinery to update safeCommit at the right points in time (e.g. when gcp advances)? My suggestion here makes that unnecessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think the added value of the approach in the follow-up is that the clean-up logic is also possibly more eagerly invoked (when gcp advances).

private IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}

@Override
Expand All @@ -70,18 +76,51 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
}

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert that the translog gen is in this commit is lower than all the ones in higher commits?

safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
commits.get(i).delete();
}
}
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
updateTranslogDeletionPolicy();
}

private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
/**
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
* Index files and translog of the capturing commit point won't be released until the commit reference is closed.
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized Engine.IndexCommitRef acquireIndexCommit(boolean acquiringSafeCommit) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
return new Engine.IndexCommitRef(snapshotting, () -> releaseCommit(snapshotting));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we leave this to the engine and make the releaseCommit method public?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also return a commit wrapper that disable the delete method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

}

private synchronized void releaseCommit(IndexCommit releasingCommit) throws IOException {
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does that work? releasingCommit is a SnapshotIndexCommit ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshotting commits are stored as keys in a HashMap. Both SnapshotIndexCommit and regular index commit inherit equals and hashCode from the root IndexCommit, thus they are interchangeable. This can be problematic if a regular index commit overrides equals or hashCode.

I see some options to avoid this.

  1. Exposes SnapshotIndexCommit to package level; makes acquireCommit and releaseCommit with SnapshotIndexCommit type.
  2. Delegate hashCode and equals of SnapshotIndexCommit to the original index commit.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think it's risky as the underlying IndexCommit may have a different implementation (it's not final). I see a 3rd option - we could use an identity map and sure people only release index commits they got from us. Until we need the ability to work all kind of crazyness like wrapped IndexCommits, I prefer to keep things strict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lucene's SnapshotDeletionPolicy identifies the IndexCommit's based on their generation (long field).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed and agreed to keep the current implementation.

"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
updateTranslogDeletionPolicy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the translog care about the snapshotting situation? it only cares about the safe commit, no?

Copy link
Member Author

@dnhatn dnhatn Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume that we have two commits c1 and c2 with c1 is the safe commit and c2 is the last commit. Clients acquire c1, then we have to keep translog of c1 until they release the commit. During that time, we have a new commit (or global checkpoint advanced), we can release c1 and its translog but have to keep them as they are being snapshotted. When clients release the commit c1, we should also release its translog rather than wait until the next onCommit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not release translog here. We should either keep or release both the commit and translog at the same time. I will update this.

}
}

private void updateTranslogDeletionPolicy() throws IOException {
assert Thread.holdsLock(this);
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
for (ObjectIntCursor<IndexCommit> entry : snapshottedCommits) {
assert entry.key.isDeleted() == false : "Snapshotted commit must not be deleted";
minRequiredGen = Math.min(minRequiredGen, Long.parseLong(entry.key.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)));
}
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -92,7 +91,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -568,7 +566,7 @@ public CommitStats commitStats() {
* @return the sequence number service
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Global stats on segments.
*/
Expand Down Expand Up @@ -859,9 +857,10 @@ public void forceMerge(boolean flush) throws IOException {
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should look in future to remove one of these booleans - either you want to "get everything" or you want a safe commit. I don't think there's a point in "flush but give me a safe commit"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Are you ok if we replace this by having two methods: acquireLastIndexCommit(flushFirst) and acquireSafeIndexCommit(no option)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Are you ok if we replace this by having two methods: acquireLastIndexCommit(flushFirst) and acquireSafeIndexCommit(no option)?

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking - the new methods will be a follow up?


/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1437,9 +1436,9 @@ public static class IndexCommitRef implements Closeable {
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;

IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
this.indexCommit = indexCommit;
this.onClose = onClose;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final SnapshotDeletionPolicy snapshotDeletionPolicy;
private final CombinedDeletionPolicy combinedDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -184,9 +184,8 @@ public InternalEngine(EngineConfig engineConfig) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
Expand Down Expand Up @@ -1644,20 +1643,15 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means a potential change to the exception type. Can you double check it's OK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ok. The method snapshot of Lucene's SnapshotDeletionPolicy throws IOException but we don't. Acquiring a commit is just increasing refCount of a commit.

}
return combinedDeletionPolicy.acquireIndexCommit(safeCommit);
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down Expand Up @@ -1828,7 +1822,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit start
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,13 +1085,14 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(flushFirst);
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
Expand Down Expand Up @@ -1125,7 +1126,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireIndexCommit(false);
indexCommit = engine.acquireIndexCommit(false, false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireIndexCommit(true);
indexCommit = shard.acquireIndexCommit(false, true);
success = true;
} finally {
if (success == false) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ final void ensureOpen() {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
Expand All @@ -270,7 +270,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
phase1Snapshot = shard.acquireIndexCommit(true, false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Loading