-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail shard if IndexShard#storeStats runs into an IOException #32241
Changes from 2 commits
4aaec57
10b83f9
8e1cbaa
6a32438
2576462
f76d924
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -917,6 +917,13 @@ public StoreStats storeStats() { | |
try { | ||
return store.stats(); | ||
} catch (IOException e) { | ||
failShard("Failing shard because of exception during storeState " + e.getMessage(), e); | ||
//TODO should close method be called inside failShard? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is confusing. Currently it ends up being closed by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes do you mean that in real application (not in unit tests), IndiciesClusterStateService will realize that shard has failed and will close the shard? That's why close call would no longer be needed. |
||
try { | ||
close("Closing shard because of exception during storeStats " + e.getMessage(), false); | ||
} catch (IOException e1) { | ||
logger.warn("Error closing shard"); | ||
} | ||
throw new ElasticsearchException("io exception while building 'store stats'", e); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import org.apache.lucene.search.TermQuery; | ||
import org.apache.lucene.search.TopDocs; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.util.Constants; | ||
import org.elasticsearch.Version; | ||
|
@@ -112,6 +114,7 @@ | |
import org.elasticsearch.test.FieldMaskingReader; | ||
import org.elasticsearch.test.VersionUtils; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.ElasticsearchException; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
|
@@ -138,6 +141,7 @@ | |
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.LongFunction; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
|
@@ -1162,6 +1166,83 @@ public void testShardStats() throws IOException { | |
closeShards(shard); | ||
} | ||
|
||
|
||
public void testShardStatsWithFailures() throws IOException { | ||
allowShardFailures(); | ||
final ShardId shardId = new ShardId("index", "_na_", 0); | ||
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING); | ||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); | ||
|
||
|
||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) | ||
.build(); | ||
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName()) | ||
.settings(settings) | ||
.primaryTerm(0, 1) | ||
.build(); | ||
|
||
// Override one Directory methods to make it fail at our will | ||
// We use AtomicReference here to inject failure in the middle of the test not immediately | ||
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace | ||
// (remember stack trace is filled when exception is instantiated) | ||
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>(); | ||
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false); | ||
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) { | ||
//fileLength method is called during storeStats try block | ||
//it's not called when store is marked as corrupted | ||
@Override | ||
public long fileLength(String name) throws IOException { | ||
Supplier<IOException> ex = exceptionToThrow.get(); | ||
if (ex == null) { | ||
return super.fileLength(name); | ||
} else { | ||
throw ex.get(); | ||
} | ||
} | ||
|
||
//listAll method is called when marking store as corrupted | ||
@Override | ||
public String[] listAll() throws IOException { | ||
Supplier<IOException> ex = exceptionToThrow.get(); | ||
if (throwWhenMarkingStoreCorrupted.get() && ex != null) { | ||
throw ex.get(); | ||
} else { | ||
return super.listAll(); | ||
} | ||
} | ||
}; | ||
|
||
Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory); | ||
IndexShard shard = newShard(shardRouting, shardPath, metaData, store, | ||
null, new InternalEngineFactory(), () -> {}, EMPTY_EVENT_LISTENER); | ||
|
||
recoverShardFromStore(shard); | ||
boolean corruptIndexException = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: make this final and set it within the relevant if close? that way, you can't change it again by mistake. |
||
|
||
if (randomBoolean()) { | ||
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💯 for checking this. |
||
throwWhenMarkingStoreCorrupted.set(randomBoolean()); | ||
} else { | ||
corruptIndexException = false; | ||
exceptionToThrow.set(() -> new IOException("Test IOException")); | ||
} | ||
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats); | ||
assertThat(shard.state(), equalTo(IndexShardState.CLOSED)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can register a shard failure event listener and go with that for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes You mean, that I can just check if shard failure event listener has triggered instead of checking shard state? If I remove shard closing from implementation, then yes, actually shard state would be opened. It was in original proposed patch by you, that's why I've included logic to close the shard. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes
I told you I wasn't sure of it's quality :) |
||
if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) { | ||
assertTrue(store.isMarkedCorrupted()); | ||
} | ||
|
||
//TODO Should it be invoked when closing the shard? | ||
try { | ||
directory.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes what do you think about explicitly closing the directory here? I've added this line, because w/o it test fails and reports unclosed resources. But is not it a shard that is responsible for directory lifecycle? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's closed by the store, which is, in turn, closed in w.r.t to this test - the easiest is to wrap the store you create in a try with resources block, so it will be always closed. |
||
} catch (IOException expected){ | ||
//expected | ||
} | ||
} | ||
|
||
public void testRefreshMetric() throws IOException { | ||
IndexShard shard = newStartedShard(); | ||
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery | ||
|
@@ -1868,6 +1949,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { | |
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
null, | ||
wrapper, | ||
new InternalEngineFactory(), | ||
() -> {}, | ||
|
@@ -2020,7 +2102,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { | |
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
wrapper, | ||
null, wrapper, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
new InternalEngineFactory(), | ||
() -> {}, | ||
EMPTY_EVENT_LISTENER); | ||
|
@@ -2506,7 +2588,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { | |
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) | ||
.build(); | ||
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, | ||
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
||
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); | ||
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); | ||
|
@@ -3005,7 +3087,7 @@ public void testFlushOnInactive() throws Exception { | |
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
AtomicBoolean markedInactive = new AtomicBoolean(); | ||
AtomicReference<IndexShard> primaryRef = new AtomicReference<>(); | ||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> { | ||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { | ||
}, new IndexEventListener() { | ||
@Override | ||
public void onShardInactive(IndexShard indexShard) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,7 +104,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { | |
shardRouting, | ||
shard.shardPath(), | ||
shard.indexSettings().getIndexMetaData(), | ||
null, | ||
null, null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
new InternalEngineFactory(), | ||
() -> {}, | ||
EMPTY_EVENT_LISTENER); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,15 +163,21 @@ public Settings threadPoolSettings() { | |
return Settings.EMPTY; | ||
} | ||
|
||
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { | ||
final ShardId shardId = shardPath.getShardId(); | ||
|
||
protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { | ||
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); | ||
} | ||
|
||
protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException { | ||
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { | ||
@Override | ||
public Directory newDirectory() throws IOException { | ||
return newFSDirectory(shardPath.resolveIndex()); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the extra line? |
||
return directory; | ||
} | ||
}; | ||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); | ||
|
||
} | ||
|
||
/** | ||
|
@@ -284,29 +290,32 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, | |
final ShardId shardId = routing.shardId(); | ||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); | ||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); | ||
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, | ||
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, | ||
EMPTY_EVENT_LISTENER, listeners); | ||
} | ||
|
||
/** | ||
* creates a new initializing shard. | ||
* @param routing shard routing to use | ||
* @param shardPath path to use for shard data | ||
* @param indexMetaData indexMetaData for the shard, including any mapping | ||
* @param indexSearcherWrapper an optional wrapper to be used during searchers | ||
* @param globalCheckpointSyncer callback for syncing global checkpoints | ||
* @param indexEventListener index even listener | ||
* @param listeners an optional set of listeners to add to the shard | ||
* @param routing shard routing to use | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. formatting |
||
* @param shardPath path to use for shard data | ||
* @param indexMetaData indexMetaData for the shard, including any mapping | ||
* @param store an optional custom store to use. If null a default file based store will be created | ||
* @param indexSearcherWrapper an optional wrapper to be used during searchers | ||
* @param globalCheckpointSyncer callback for syncing global checkpoints | ||
* @param indexEventListener index event listener | ||
* @param listeners an optional set of listeners to add to the shard | ||
*/ | ||
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, | ||
@Nullable IndexSearcherWrapper indexSearcherWrapper, | ||
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper, | ||
@Nullable EngineFactory engineFactory, | ||
Runnable globalCheckpointSyncer, | ||
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException { | ||
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); | ||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); | ||
final IndexShard indexShard; | ||
final Store store = createStore(indexSettings, shardPath); | ||
if (store == null) { | ||
store = createStore(indexSettings, shardPath); | ||
} | ||
boolean success = false; | ||
try { | ||
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); | ||
|
@@ -356,7 +365,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index | |
routing, | ||
current.shardPath(), | ||
current.indexSettings().getIndexMetaData(), | ||
null, | ||
null, null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is probably better on a separate line like the rest of the parameters. |
||
current.engineFactory, | ||
current.getGlobalCheckpointSyncer(), | ||
EMPTY_EVENT_LISTENER, listeners); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should say
storeStats
, notstoreState
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing, looking at the implementation of
failShard
, we don't need to appende.getMessage()
here.