Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@

package org.elasticsearch.env;

import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -34,22 +30,22 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -63,6 +59,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
Expand All @@ -74,6 +71,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -84,6 +82,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;

Expand Down Expand Up @@ -440,7 +440,7 @@ private static String toString(Collection<String> items) {
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (ShardLock lock = shardLock(shardId)) {
try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) {
deleteShardDirectoryUnderLock(lock, indexSettings);
}
}
Expand Down Expand Up @@ -532,7 +532,7 @@ private static boolean assertPathsDoNotExist(final Path[] paths) {

private boolean isShardLocked(ShardId id) {
try {
shardLock(id, 0).close();
shardLock(id, "checking if shard is locked").close();
return false;
} catch (ShardLockObtainFailedException ex) {
return true;
Expand All @@ -551,7 +551,7 @@ private boolean isShardLocked(ShardId id) {
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
throws IOException, ShardLockObtainFailedException {
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS);
try {
deleteIndexDirectoryUnderLock(index, indexSettings);
} finally {
Expand Down Expand Up @@ -586,7 +586,8 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @return the {@link ShardLock} instances for this index.
*/
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) throws ShardLockObtainFailedException {
public List<ShardLock> lockAllForIndex(final Index index, final IndexSettings settings,
final String lockDetails, final long lockTimeoutMS) throws ShardLockObtainFailedException {
final int numShards = settings.getNumberOfShards();
if (numShards <= 0) {
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
Expand All @@ -598,7 +599,7 @@ public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long
try {
for (int i = 0; i < numShards; i++) {
long timeoutLeftMS = Math.max(0, lockTimeoutMS - TimeValue.nsecToMSec((System.nanoTime() - startTimeNS)));
allLocks.add(shardLock(new ShardId(index, i), timeoutLeftMS));
allLocks.add(shardLock(new ShardId(index, i), lockDetails, timeoutLeftMS));
}
success = true;
} finally {
Expand All @@ -619,10 +620,11 @@ public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long
* Note: this method will return immediately if the lock can't be acquired.
*
* @param id the shard ID to lock
* @param details information about why the shard is being locked
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
*/
public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException {
return shardLock(id, 0);
public ShardLock shardLock(ShardId id, final String details) throws ShardLockObtainFailedException {
return shardLock(id, details, 0);
}

/**
Expand All @@ -631,11 +633,13 @@ public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException {
* or recover from a different shard instance into it. If the shard lock can not be acquired
* a {@link ShardLockObtainFailedException} is thrown
* @param shardId the shard ID to lock
* @param details information about why the shard is being locked
* @param lockTimeoutMS the lock timeout in milliseconds
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
*/
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException {
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
public ShardLock shardLock(final ShardId shardId, final String details,
final long lockTimeoutMS) throws ShardLockObtainFailedException {
logger.trace("acquiring node shardlock on [{}], timeout [{}], details [{}]", shardId, lockTimeoutMS, details);
final InternalShardLock shardLock;
final boolean acquired;
synchronized (shardLocks) {
Expand All @@ -644,15 +648,15 @@ public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws Sha
shardLock.incWaitCount();
acquired = false;
} else {
shardLock = new InternalShardLock(shardId);
shardLock = new InternalShardLock(shardId, details);
shardLocks.put(shardId, shardLock);
acquired = true;
}
}
if (acquired == false) {
boolean success = false;
try {
shardLock.acquire(lockTimeoutMS);
shardLock.acquire(lockTimeoutMS, details);
success = true;
} finally {
if (success == false) {
Expand All @@ -671,11 +675,11 @@ protected void closeInternal() {
}

/**
* A functional interface that people can use to reference {@link #shardLock(ShardId, long)}
* A functional interface that people can use to reference {@link #shardLock(ShardId, String, long)}
*/
@FunctionalInterface
public interface ShardLocker {
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException;
ShardLock lock(ShardId shardId, String lockDetails, long lockTimeoutMS) throws ShardLockObtainFailedException;
}

/**
Expand All @@ -698,11 +702,13 @@ private final class InternalShardLock {
*/
private final Semaphore mutex = new Semaphore(1);
private int waitCount = 1; // guarded by shardLocks
private String lockDetails;
private final ShardId shardId;

InternalShardLock(ShardId shardId) {
InternalShardLock(final ShardId shardId, final String details) {
this.shardId = shardId;
mutex.acquireUninterruptibly();
lockDetails = details;
}

protected void release() {
Expand Down Expand Up @@ -730,11 +736,14 @@ private void decWaitCount() {
}
}

void acquire(long timeoutInMillis) throws ShardLockObtainFailedException {
void acquire(long timeoutInMillis, final String details) throws ShardLockObtainFailedException {
try {
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS)) {
lockDetails = details;
} else {
throw new ShardLockObtainFailedException(shardId,
"obtaining shard lock timed out after " + timeoutInMillis + "ms");
"obtaining shard lock timed out after " + timeoutInMillis + "ms, previous lock details: [" + lockDetails +
"] trying to lock for [" + details + "]");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public synchronized IndexShard createShard(
IndexShard indexShard = null;
ShardLock lock = null;
try {
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
lock = nodeEnv.shardLock(shardId, "shard creation", TimeUnit.SECONDS.toMillis(5));
eventListener.beforeIndexShardCreated(shardId, indexSettings);
ShardPath path;
try {
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private void closeInternal() {
*/
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
Logger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
try (ShardLock lock = shardLocker.lock(shardId, "read metadata snapshot", TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
return new MetadataSnapshot(null, dir, logger);
Expand All @@ -457,7 +457,7 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId
*/
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
Logger logger) throws IOException, ShardLockObtainFailedException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time
throws IOException, InterruptedException, ShardLockObtainFailedException {
logger.debug("{} processing pending deletes", index);
final long startTimeNS = System.nanoTime();
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, "process pending deletes", timeout.millis());
int numRemoved = 0;
try {
Map<ShardId, ShardLock> locks = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ public void testShardLock() throws Exception {
final NodeEnvironment env = newNodeEnvironment();

Index index = new Index("foo", "fooUUID");
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1");
assertEquals(new ShardId(index, 0), fooLock.getShardId());

try {
env.shardLock(new ShardId(index, 0));
env.shardLock(new ShardId(index, 0), "2");
fail("shard is locked");
} catch (ShardLockObtainFailedException ex) {
// expected
Expand All @@ -149,19 +149,19 @@ public void testShardLock() throws Exception {
Files.createDirectories(path.resolve("1"));
}
try {
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
env.lockAllForIndex(index, idxSettings, "3", randomIntBetween(0, 10));
fail("shard 0 is locked");
} catch (ShardLockObtainFailedException ex) {
// expected
}

fooLock.close();
// can lock again?
env.shardLock(new ShardId(index, 0)).close();
env.shardLock(new ShardId(index, 0), "4").close();

List<ShardLock> locks = env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
List<ShardLock> locks = env.lockAllForIndex(index, idxSettings, "5", randomIntBetween(0, 10));
try {
env.shardLock(new ShardId(index, 0));
env.shardLock(new ShardId(index, 0), "6");
fail("shard is locked");
} catch (ShardLockObtainFailedException ex) {
// expected
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testResolveIndexFolders() throws Exception {
public void testDeleteSafe() throws Exception {
final NodeEnvironment env = newNodeEnvironment();
final Index index = new Index("foo", "fooUUID");
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1");
assertEquals(new ShardId(index, 0), fooLock.getShardId());

for (Path path : env.indexPaths(index)) {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
start.await();
try (ShardLock autoCloses = env.shardLock(new ShardId(index, 0))) {
try (ShardLock autoCloses = env.shardLock(new ShardId(index, 0), "2")) {
blockLatch.countDown();
Thread.sleep(randomIntBetween(1, 10));
}
Expand Down Expand Up @@ -353,7 +353,7 @@ public void run() {
for (int i = 0; i < iters; i++) {
int shard = randomIntBetween(0, counts.length - 1);
try {
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard),
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), "1",
scaledRandomIntBetween(0, 10))) {
counts[shard].value++;
countsAtomic[shard].incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void testFailShard() throws Exception {
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
// but index can't be opened for a failed shard
assertThat("store index should be corrupted", StoreUtils.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(),
(shardId, lockTimeoutMS) -> new DummyShardLock(shardId)),
(shardId, lockTimeoutMS, details) -> new DummyShardLock(shardId)),
equalTo(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,17 +926,17 @@ public void testCanOpenIndex() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
writer.close();
assertTrue(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
assertTrue(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
store.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,7 @@ public synchronized void assertAfterTest() throws IOException {
Set<ShardId> shardIds = env.lockedShards();
for (ShardId id : shardIds) {
try {
env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close();
env.shardLock(id, "InternalTestCluster assert after test", TimeUnit.SECONDS.toMillis(5)).close();
} catch (ShardLockObtainFailedException ex) {
fail("Shard " + id + " is still locked after 5 sec waiting");
}
Expand Down