diff --git a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 9af22324fd96a..0e4f83b625dbc 100644 --- a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -633,6 +633,15 @@ protected void closeInternal() { }; } + /** + * A functional interface that people can use to reference {@link #shardLock(ShardId, long)} + */ + @FunctionalInterface + public interface ShardLocker { + + ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException; + } + /** * Returns all currently lock shards. * diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 5184496cb1c71..fc23ef13581df 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -65,18 +66,19 @@ public class TransportNodesListGatewayStartedShards extends public static final String ACTION_NAME = "internal:gateway/local/started_shards"; private final NodeEnvironment nodeEnv; - + private final IndicesService indicesService; @Inject public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - NodeEnvironment env) { + NodeEnvironment env, IndicesService indicesService) { super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED, NodeGatewayStartedShards.class); this.nodeEnv = env; + this.indicesService = indicesService; } @Override @@ -127,21 +129,24 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { throw e; } - ShardPath shardPath = null; - try { - IndexSettings indexSettings = new IndexSettings(metaData, settings); - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); - if (shardPath == null) { - throw new IllegalStateException(shardId + " no shard path found"); + if (indicesService.getShardOrNull(shardId) == null) { + // we don't have an open shard on the store, validate the files on disk are openable + ShardPath shardPath = null; + try { + IndexSettings indexSettings = new IndexSettings(metaData, settings); + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + } catch (Exception exception) { + logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, + shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); + String allocationId = shardStateMetaData.allocationId != null ? + shardStateMetaData.allocationId.getId() : null; + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, + allocationId, shardStateMetaData.primary, exception); } - Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger); - } catch (Exception exception) { - logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, - shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); - String allocationId = shardStateMetaData.allocationId != null ? - shardStateMetaData.allocationId.getId() : null; - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, - allocationId, shardStateMetaData.primary, exception); } logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 979c834ccc965..82c6a2db6d7ca 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -68,6 +68,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -88,6 +89,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.CRC32; @@ -403,8 +405,10 @@ private void closeInternal() { * * @throws IOException if the index we try to read is corrupted */ - public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException { - try (Directory dir = new SimpleFSDirectory(indexLocation)) { + public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, + ESLogger logger) throws IOException { + try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5)); + Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, shardId); return new MetadataSnapshot(null, dir, logger); } catch (IndexNotFoundException ex) { @@ -420,9 +424,9 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId * can be successfully opened. This includes reading the segment infos and possible * corruption markers. */ - public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId) throws IOException { + public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker) throws IOException { try { - tryOpenIndex(indexLocation, shardId, logger); + tryOpenIndex(indexLocation, shardId, shardLocker, logger); } catch (Exception ex) { logger.trace("Can't open index for path [{}]", ex, indexLocation); return false; @@ -435,8 +439,9 @@ public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId * segment infos and possible corruption markers. If the index can not * be opened, an exception is thrown */ - public static void tryOpenIndex(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException { - try (Directory dir = new SimpleFSDirectory(indexLocation)) { + public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, ESLogger logger) throws IOException { + try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5)); + Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, shardId); SegmentInfos segInfo = Lucene.readSegmentInfos(dir); logger.trace("{} loaded segment info [{}]", shardId, segInfo); diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index a4517baf45b99..cdc95e1895ac0 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -150,7 +150,12 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException if (shardPath == null) { return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); } - return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, logger)); + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: + // 1) a shard is being constructed, which means the master will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not + // reuse local resources. + return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, + nodeEnv::shardLock, logger)); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 050fa0430baf5..7d74c7ec2f090 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -442,7 +442,6 @@ public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { */ @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE," + "indices.recovery:TRACE,indices.cluster:TRACE") - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/19416") public void testAckedIndexing() throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b2b6a657fda96..95a705f8e27c0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,27 +18,6 @@ */ package org.elasticsearch.index.shard; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; @@ -134,6 +113,27 @@ import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -253,7 +253,8 @@ public void testFailShard() throws Exception { ShardPath shardPath = ShardPath.loadShardPath(logger, env, shard.shardId(), test.getIndexSettings()); assertNotNull(shardPath); // but index can't be opened for a failed shard - assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId()), equalTo(false)); + assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(), env::shardLock), + equalTo(false)); } ShardStateMetaData getShardStateMetadata(IndexShard shard) { diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index e40f1c7f06fbd..e1636b713a1c9 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -1000,14 +1000,14 @@ public void testCanOpenIndex() throws IOException { IndexWriterConfig iwc = newIndexWriterConfig(); Path tempDir = createTempDir(); final BaseDirectoryWrapper dir = newFSDirectory(tempDir); - assertFalse(Store.canOpenIndex(logger, tempDir,shardId)); + assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id))); IndexWriter writer = new IndexWriter(dir, iwc); Document doc = new Document(); doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); writer.addDocument(doc); writer.commit(); writer.close(); - assertTrue(Store.canOpenIndex(logger, tempDir, shardId)); + assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id))); DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) { @Override @@ -1022,7 +1022,7 @@ public Directory newDirectory() throws IOException { }; Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId)); store.markStoreCorrupted(new CorruptIndexException("foo", "bar")); - assertFalse(Store.canOpenIndex(logger, tempDir, shardId)); + assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id))); store.close(); }