Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2354,6 +2354,10 @@ public Translog.Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

public TranslogConfig getTranslogConfig() {
return translogConfig;
}

// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
Expand All @@ -46,14 +49,17 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -351,6 +357,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
try {
store.failIfCorrupted();
try {
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
prepareStartingCommitPoint(indexShard);
}
si = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
String files = "_unknown_";
Expand Down Expand Up @@ -405,6 +414,50 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
}

/**
* We may have kept multiple commit points in the existing store, thus we have chance to find a good starting
* commit point. All the required translog files of a starting commit point must be retained, and if possible
* its max seqno is at most the global checkpoint from the translog checkpoint. With a good starting commit,
* we may be able to throw away stale operations. Once the starting commit is determined, we delete other files
* which are not referenced by this commit to prevent potential problems.
*/
private void prepareStartingCommitPoint(IndexShard indexShard) throws IOException {
assert indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE
: "Only call clean unsafe commits when recovering from existing store; recoveryType [" +
indexShard.recoveryState().getRecoverySource().getType() + "]";
final Store store = indexShard.store();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
int startingIndex = commits.size() - 1;
try {
final Path translogPath = indexShard.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath);
final long minRefTranslogGen = Translog.readMinReferencedTranslogGen(translogPath);
startingIndex = indexOfStartingCommit(commits, globalCheckpoint, minRefTranslogGen);
} catch (IOException ex) {
logger.warn(new ParameterizedMessage("Failed to find a safe commit for shard [{}]; pick the last commit", shardId), ex);
}
store.pruneUnreferencedFiles(commits.get(startingIndex));
}

private int indexOfStartingCommit(List<IndexCommit> commits, long globalCheckpoint, long minReferencedTranslogGen) throws IOException {
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitData = commits.get(i).getUserData();
// If all required translog files of this commit are not retained, we should use the younger commit.
if (Long.parseLong(commitData.get(Translog.TRANSLOG_GENERATION_KEY)) < minReferencedTranslogGen) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO.
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return i;
}
// This is a safe commit.
if (Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)) <= globalCheckpoint) {
return i;
}
}
return commits.size() - 1;
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
Expand Down Expand Up @@ -349,6 +350,20 @@ public StoreStats stats() throws IOException {
return statsCache.getOrRefresh();
}

/**
* Removes all existing files in this store that are not referenced the given commit point.
*/
public void pruneUnreferencedFiles(IndexCommit commit) throws IOException {
metadataLock.writeLock().lock();
try {
Lucene.pruneUnreferencedFiles(commit.getSegmentsFileName(), directory);
assert DirectoryReader.listCommits(directory).size() == 1
: "Should have one commit after pruning, found [" + DirectoryReader.listCommits(directory) + "]";
} finally {
metadataLock.writeLock().unlock();
}
}

/**
* Increments the refCount of this Store instance. RefCounts are used to determine when a
* Store can be closed safely, i.e. as soon as there are no more references. Be sure to always call a
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1703,6 +1703,17 @@ public static final long readGlobalCheckpoint(final Path location) throws IOExce
return readCheckpoint(location).globalCheckpoint;
}

/**
* Reads the minimum referenced generation translog generation from the translog checkpoint.
*
* @param location the location of the translog
* @return the minimum generation referenced by the translog.
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static long readMinReferencedTranslogGen(final Path location) throws IOException {
return readCheckpoint(location).minTranslogGeneration;
}

/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand Down Expand Up @@ -58,7 +56,6 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -87,12 +84,8 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -1237,6 +1230,7 @@ public void testRefreshMetric() throws IOException {
public void testIndexingOperationsListeners() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
shard.updateLocalCheckpointForShard(shard.shardRouting.allocationId().getId(), 0);
AtomicInteger preIndex = new AtomicInteger();
AtomicInteger postIndexCreate = new AtomicInteger();
AtomicInteger postIndexUpdate = new AtomicInteger();
Expand Down Expand Up @@ -1545,28 +1539,50 @@ protected void doRun() throws Exception {
}

public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
/*
* The flow of this test:
* - delete #1
* - roll generation (to create gen 2)
* - index #0
* - index #3
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
* - index #2
* - index #5
* - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed.
*/
final IndexShard shard = newStartedShard(false);
final Consumer<Mapping> mappingConsumer = getMappingUpdater(shard, "test");
// delete #1
shard.applyDeleteOperationOnReplica(1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer);
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
// index #0
shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer);

// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick
// around
shard.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
// index #3
shard.applyIndexOperationOnReplica(3, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-3", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.updateGlobalCheckpointOnReplica(1, "test");
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
shard.applyIndexOperationOnReplica(2, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
// index #3
shard.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-5", new BytesArray("{}"), XContentType.JSON), mappingConsumer);

final int translogOps;
if (randomBoolean()) {
// Advancing the global checkpoint allows to us to recover from the second commit, not the first one.
shard.updateGlobalCheckpointOnReplica(3, "test");
logger.info("--> flushing shard");
flushShard(shard);
translogOps = 2;
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
translogOps = 4; // delete #1 won't be replayed.
} else if (randomBoolean()) {
shard.getEngine().rollTranslogGeneration();
translogOps = 3;
translogOps = 5;
} else {
translogOps = 3;
translogOps = 5;
}

final ShardRouting replicaRouting = shard.routingEntry();
Expand All @@ -1581,7 +1597,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry()));
assertDocCount(newShard, 1);
assertDocCount(newShard, 3);
closeShards(newShard);
}

Expand All @@ -1591,6 +1607,7 @@ public void testRecoverFromStore() throws IOException {
int translogOps = totalOps;
for (int i = 0; i < totalOps; i++) {
indexDoc(shard, "test", Integer.toString(i));
shard.updateLocalCheckpointForShard(shard.shardRouting.allocationId().getId(), i);
}
if (randomBoolean()) {
flushShard(shard);
Expand Down