Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -65,18 +66,19 @@ public class TransportNodesListGatewayStartedShards extends

public static final String ACTION_NAME = "internal:gateway/local/started_shards";
private final NodeEnvironment nodeEnv;

private final IndicesService indicesService;

@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeEnvironment env) {
NodeEnvironment env, IndicesService indicesService) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShards.class);
this.nodeEnv = env;
this.indicesService = indicesService;
}

@Override
Expand Down Expand Up @@ -127,21 +129,24 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
throw e;
}

ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
if (indicesService.getShardOrNull(shardId) == null) {
// we don't have an open shard on the store, validate the files on disk are openable
ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
}

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -403,8 +404,10 @@ private void closeInternal() {
*
* @throws IOException if the index we try to read is corrupted
*/
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, ShardLocker shardLocker,
ESLogger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
return new MetadataSnapshot(null, dir, logger);
} catch (IndexNotFoundException ex) {
Expand All @@ -420,9 +423,9 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId
* can be successfully opened. This includes reading the segment infos and possible
* corruption markers.
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId) throws IOException {
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId, ShardLocker shardLocker) throws IOException {
try {
tryOpenIndex(indexLocation, shardId, logger);
tryOpenIndex(indexLocation, shardId, shardLocker, logger);
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
Expand All @@ -435,8 +438,9 @@ public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
public static void tryOpenIndex(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
public static void tryOpenIndex(Path indexLocation, ShardId shardId, ShardLocker shardLocker, ESLogger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
Expand Down Expand Up @@ -1398,4 +1402,13 @@ private static long estimateSize(Directory directory) throws IOException {
}
}

/**
* A shard lock supplier that is used by the static methods on this class. Normal methods rely on
* the shard lock passed to the constructor.
*/
@FunctionalInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into NodeEnv instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

public interface ShardLocker {

ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
if (shardPath == null) {
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
}
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, logger));
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replcia
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to add a 2) here?

Copy link
Contributor Author

@bleskes bleskes Jul 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, yeah - ADD for the works :)
2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not reuse local resources.

return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId,
nodeEnv::shardLock, logger));
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,6 @@
*/
package org.elasticsearch.index.shard;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
Expand Down Expand Up @@ -134,6 +113,27 @@
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
Expand Down Expand Up @@ -253,7 +253,8 @@ public void testFailShard() throws Exception {
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shard.shardId(), test.getIndexSettings());
assertNotNull(shardPath);
// but index can't be opened for a failed shard
assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId()), equalTo(false));
assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(), env::shardLock),
equalTo(false));
}

ShardStateMetaData getShardStateMetadata(IndexShard shard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,14 +1000,14 @@ public void testCanOpenIndex() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(Store.canOpenIndex(logger, tempDir,shardId));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir, shardId));
assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));

DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
Expand All @@ -1022,7 +1022,7 @@ public Directory newDirectory() throws IOException {
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
store.close();
}

Expand Down