Skip to content

Commit

Permalink
Fail shard if IndexShard#storeStats runs into an IOException (#32241)
Browse files Browse the repository at this point in the history
Fail shard if IndexShard#storeStats runs into an IOException. Closes #29008
(cherry picked from commit 33f11e6)
  • Loading branch information
andrershov authored and Andrey Ershov committed Jul 23, 2018
1 parent 864711b commit d49f2c4
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ public StoreStats storeStats() {
try {
return store.stats();
} catch (IOException e) {
failShard("Failing shard because of exception during storeStats", e);
throw new ElasticsearchException("io exception while building 'store stats'", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -111,6 +113,7 @@
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.ElasticsearchException;

import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -137,6 +140,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -1161,6 +1165,81 @@ public void testShardStats() throws IOException {
closeShards(shard);
}


public void testShardStatsWithFailures() throws IOException {
allowShardFailures();
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());


ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, 1)
.build();

// Override two Directory methods to make them fail at our will
// We use AtomicReference here to inject failure in the middle of the test not immediately
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace
// (remember stack trace is filled when exception is instantiated)
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>();
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false);
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) {
//fileLength method is called during storeStats try block
//it's not called when store is marked as corrupted
@Override
public long fileLength(String name) throws IOException {
Supplier<IOException> ex = exceptionToThrow.get();
if (ex == null) {
return super.fileLength(name);
} else {
throw ex.get();
}
}

//listAll method is called when marking store as corrupted
@Override
public String[] listAll() throws IOException {
Supplier<IOException> ex = exceptionToThrow.get();
if (throwWhenMarkingStoreCorrupted.get() && ex != null) {
throw ex.get();
} else {
return super.listAll();
}
}
};

try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
null, new InternalEngineFactory(), () -> {
}, EMPTY_EVENT_LISTENER);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true));

recoverShardFromStore(shard);

final boolean corruptIndexException = randomBoolean();

if (corruptIndexException) {
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource"));
throwWhenMarkingStoreCorrupted.set(randomBoolean());
} else {
exceptionToThrow.set(() -> new IOException("Test IOException"));
}
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats);
assertTrue(failureCallbackTriggered.get());

if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) {
assertTrue(store.isMarkedCorrupted());
}
}
}

public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
Expand Down Expand Up @@ -1867,6 +1946,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
wrapper,
new InternalEngineFactory(),
() -> {},
Expand Down Expand Up @@ -2018,6 +2098,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
wrapper,
new InternalEngineFactory(),
() -> {},
Expand Down Expand Up @@ -2504,7 +2585,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
Expand Down Expand Up @@ -2861,7 +2942,7 @@ public void testFlushOnInactive() throws Exception {
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> {
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> {
}, new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
null,
new InternalEngineFactory(),
() -> {},
EMPTY_EVENT_LISTENER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ assert shardRoutings().stream()

public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
nodeId,
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);
shardId,
nodeId,
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);

final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting),
() -> {}, EMPTY_EVENT_LISTENER);
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,20 @@ public Settings threadPoolSettings() {
return Settings.EMPTY;
}

private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
final ShardId shardId = shardPath.getShardId();

protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
}

protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return newFSDirectory(shardPath.resolveIndex());
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));

}

/**
Expand Down Expand Up @@ -284,29 +289,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
EMPTY_EVENT_LISTENER, listeners);
}

/**
* creates a new initializing shard.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index even listener
* @param listeners an optional set of listeners to add to the shard
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param store an optional custom store to use. If null a default file based store will be created
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
final Store store = createStore(indexSettings, shardPath);
if (store == null) {
store = createStore(indexSettings, shardPath);
}
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
Expand Down Expand Up @@ -357,6 +365,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
current.shardPath(),
current.indexSettings().getIndexMetaData(),
null,
null,
current.engineFactory,
current.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER, listeners);
Expand Down

0 comments on commit d49f2c4

Please sign in to comment.