From d49f2c4bfdcf180d790cf2fe8ad5e28ec08aae24 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 23 Jul 2018 16:38:55 +0200 Subject: [PATCH] Fail shard if IndexShard#storeStats runs into an IOException (#32241) Fail shard if IndexShard#storeStats runs into an IOException. Closes #29008 (cherry picked from commit 33f11e637dc05b5112ffc8c1ca95000a4782fa6d) --- .../elasticsearch/index/shard/IndexShard.java | 1 + .../index/shard/IndexShardTests.java | 85 ++++++++++++++++++- .../BlobStoreRepositoryRestoreTests.java | 1 + .../ESIndexLevelReplicationTestCase.java | 11 +-- .../index/shard/IndexShardTestCase.java | 35 +++++--- 5 files changed, 113 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e62f1ed040124..2e2c3115fd70d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -949,6 +949,7 @@ public StoreStats storeStats() { try { return store.stats(); } catch (IOException e) { + failShard("Failing shard because of exception during storeStats", e); throw new ElasticsearchException("io exception while building 'store stats'", e); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6cf90ab2c3580..ab76e45149578 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,8 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; @@ -111,6 +113,7 @@ import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.ElasticsearchException; import java.io.IOException; import java.nio.charset.Charset; @@ -137,6 +140,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1161,6 +1165,81 @@ public void testShardStats() throws IOException { closeShards(shard); } + + public void testShardStatsWithFailures() throws IOException { + allowShardFailures(); + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); + + + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName()) + .settings(settings) + .primaryTerm(0, 1) + .build(); + + // Override two Directory methods to make them fail at our will + // We use AtomicReference here to inject failure in the middle of the test not immediately + // We use Supplier instead of IOException to produce meaningful stacktrace + // (remember stack trace is filled when exception is instantiated) + AtomicReference> exceptionToThrow = new AtomicReference<>(); + AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false); + Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) { + //fileLength method is called during storeStats try block + //it's not called when store is marked as corrupted + @Override + public long fileLength(String name) throws IOException { + Supplier ex = exceptionToThrow.get(); + if (ex == null) { + return super.fileLength(name); + } else { + throw ex.get(); + } + } + + //listAll method is called when marking store as corrupted + @Override + public String[] listAll() throws IOException { + Supplier ex = exceptionToThrow.get(); + if (throwWhenMarkingStoreCorrupted.get() && ex != null) { + throw ex.get(); + } else { + return super.listAll(); + } + } + }; + + try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) { + IndexShard shard = newShard(shardRouting, shardPath, metaData, store, + null, new InternalEngineFactory(), () -> { + }, EMPTY_EVENT_LISTENER); + AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); + shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true)); + + recoverShardFromStore(shard); + + final boolean corruptIndexException = randomBoolean(); + + if (corruptIndexException) { + exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource")); + throwWhenMarkingStoreCorrupted.set(randomBoolean()); + } else { + exceptionToThrow.set(() -> new IOException("Test IOException")); + } + ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats); + assertTrue(failureCallbackTriggered.get()); + + if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) { + assertTrue(store.isMarkedCorrupted()); + } + } + } + public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery @@ -1867,6 +1946,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), shard.shardPath(), shard.indexSettings().getIndexMetaData(), + null, wrapper, new InternalEngineFactory(), () -> {}, @@ -2018,6 +2098,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), shard.shardPath(), shard.indexSettings().getIndexMetaData(), + null, wrapper, new InternalEngineFactory(), () -> {}, @@ -2504,7 +2585,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) .build(); final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, - null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); @@ -2861,7 +2942,7 @@ public void testFlushOnInactive() throws Exception { ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); AtomicBoolean markedInactive = new AtomicBoolean(); AtomicReference primaryRef = new AtomicReference<>(); - IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> { + IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { }, new IndexEventListener() { @Override public void onShardInactive(IndexShard indexShard) { diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 0eae9a1420068..fa7de2d629112 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -105,6 +105,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, + null, new InternalEngineFactory(), () -> {}, EMPTY_EVENT_LISTENER); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index cfa2e49929949..c1e3fb90c514a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -259,13 +259,14 @@ assert shardRoutings().stream() public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( - shardId, - nodeId, - false, ShardRoutingState.INITIALIZING, - RecoverySource.PeerRecoverySource.INSTANCE); + shardId, + nodeId, + false, ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE); final IndexShard newReplica = - newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); + newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting), + () -> {}, EMPTY_EVENT_LISTENER); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 889de5d274f34..f2bd23257fd5a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -163,15 +163,20 @@ public Settings threadPoolSettings() { return Settings.EMPTY; } - private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { - final ShardId shardId = shardPath.getShardId(); + + protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { + return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); + } + + protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException { final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { @Override public Directory newDirectory() throws IOException { - return newFSDirectory(shardPath.resolveIndex()); + return directory; } }; return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } /** @@ -284,29 +289,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, final ShardId shardId = routing.shardId(); final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, + return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, EMPTY_EVENT_LISTENER, listeners); } /** * creates a new initializing shard. - * @param routing shard routing to use - * @param shardPath path to use for shard data - * @param indexMetaData indexMetaData for the shard, including any mapping - * @param indexSearcherWrapper an optional wrapper to be used during searchers - * @param globalCheckpointSyncer callback for syncing global checkpoints - * @param indexEventListener index even listener - * @param listeners an optional set of listeners to add to the shard + * @param routing shard routing to use + * @param shardPath path to use for shard data + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param store an optional custom store to use. If null a default file based store will be created + * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param globalCheckpointSyncer callback for syncing global checkpoints + * @param indexEventListener index event listener + * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, - @Nullable IndexSearcherWrapper indexSearcherWrapper, + @Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); final IndexShard indexShard; - final Store store = createStore(indexSettings, shardPath); + if (store == null) { + store = createStore(indexSettings, shardPath); + } boolean success = false; try { IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); @@ -357,6 +365,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index current.shardPath(), current.indexSettings().getIndexMetaData(), null, + null, current.engineFactory, current.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER, listeners);