Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,17 @@ public void testLockTryingToDelete() throws Exception {
// Test without the regular shard lock to assume we can acquire it
// (worst case, meaning that the shard lock could be acquired and
// we're green to delete the shard's directory)
ShardLock sLock = new DummyShardLock(new ShardId(index, 0));
try {
env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY));
fail("should not have been able to delete the directory");
} catch (LockObtainFailedException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
final ShardLock sLock = new DummyShardLock(new ShardId(index, 0));
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY);

final LockObtainFailedException exception = expectThrows(LockObtainFailedException.class, () ->
env.deleteShardDirectoryUnderLock(sLock, indexSettings, indexPaths -> {
assert false : "should not be called " + indexPaths;
}));
assertThat(exception.getMessage(), exception.getMessage(), containsString("unable to acquire write.lock"));
}

public void testDurableFlagHasEffect() throws Exception {
public void testDurableFlagHasEffect() {
createIndex("test");
ensureGreen();
client().prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).get();
Expand Down

Large diffs are not rendered by default.

31 changes: 24 additions & 7 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -553,11 +554,15 @@ private static String toString(Collection<String> items) {
* @param shardId the id of the shard to delete to delete
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
public void deleteShardDirectorySafe(
ShardId shardId,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException, ShardLockObtainFailedException {
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) {
deleteShardDirectoryUnderLock(lock, indexSettings);
deleteShardDirectoryUnderLock(lock, indexSettings, listener);
}
}

Expand Down Expand Up @@ -602,18 +607,24 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
* @throws IOException if an IOException occurs
* @throws ElasticsearchException if the write.lock is not acquirable
*/
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
public void deleteShardDirectoryUnderLock(
ShardLock lock,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
listener.accept(paths);
IOUtils.rm(paths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId);
logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation);
acquireFSLockForPaths(indexSettings, customLocation);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
listener.accept(new Path[]{customLocation});
IOUtils.rm(customLocation);
}
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
Expand Down Expand Up @@ -665,11 +676,15 @@ private boolean isShardLocked(ShardId id) {
* @param indexSettings settings for the index being deleted
* @throws IOException if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
throws IOException, ShardLockObtainFailedException {
public void deleteIndexDirectorySafe(
Index index,
long lockTimeoutMS,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException, ShardLockObtainFailedException {
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS);
try {
deleteIndexDirectoryUnderLock(index, indexSettings);
deleteIndexDirectoryUnderLock(index, indexSettings, listener);
} finally {
IOUtils.closeWhileHandlingException(locks);
}
Expand All @@ -682,13 +697,15 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
* @param index the index to delete
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings, Consumer<Path[]> listener) throws IOException {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
listener.accept(indexPaths);
IOUtils.rm(indexPaths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
listener.accept(new Path[]{customLocation});
IOUtils.rm(customLocation);
}
}
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry) throws IOException {
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
Expand Down Expand Up @@ -442,7 +443,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
valuesSourceRegistry, recoveryStateFactory);
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener);
success = true;
return indexService;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final BitsetFilterCache bitsetFilterCache;
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
Expand Down Expand Up @@ -178,7 +179,9 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) {
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -219,6 +222,7 @@ public IndexService(
}

this.shardStoreDeleter = shardStoreDeleter;
this.indexFoldersDeletionListener = indexFoldersDeletionListener;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.scriptService = scriptService;
Expand Down Expand Up @@ -414,7 +418,8 @@ public synchronized IndexShard createShard(
} catch (IllegalStateException ex) {
logger.warn("{} failed to load shard path, trying to remove leftover", shardId);
try {
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings);
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings, shardPaths ->
indexFoldersDeletionListener.beforeShardFoldersDeleted(shardId, this.indexSettings, shardPaths));
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (Exception inner) {
ex.addSuppressed(inner);
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public final class ShardPath {
public static final String INDEX_FOLDER_NAME = "index";
Expand Down Expand Up @@ -172,8 +173,13 @@ public static ShardPath loadShardPath(Logger logger, ShardId shardId, String cus
* This method tries to delete left-over shards where the index name has been reused but the UUID is different
* to allow the new shard to be allocated.
*/
public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment env,
ShardLock lock, IndexSettings indexSettings) throws IOException {
public static void deleteLeftoverShardDirectory(
final Logger logger,
final NodeEnvironment env,
final ShardLock lock,
final IndexSettings indexSettings,
final Consumer<Path[]> listener
) throws IOException {
final String indexUUID = indexSettings.getUUID();
final Path[] paths = env.availableShardPaths(lock.getShardId());
for (Path path : paths) {
Expand All @@ -183,7 +189,8 @@ public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment e
if (load.indexUUID.equals(indexUUID) == false && IndexMetadata.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) {
logger.warn("{} deleting leftover shard on path: [{}] with a different index UUID", lock.getShardId(), path);
assert Files.isDirectory(path) : path + " is not a directory";
NodeEnvironment.acquireFSLockForPaths(indexSettings, paths);
NodeEnvironment.acquireFSLockForPaths(indexSettings, path);
listener.accept(new Path[]{path});
IOUtils.rm(path);
}
}
Expand Down
24 changes: 17 additions & 7 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.store.CompositeIndexFoldersDeletionListener;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -222,6 +223,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListeners;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
Expand Down Expand Up @@ -250,7 +252,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -297,6 +300,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.indexFoldersDeletionListeners = new CompositeIndexFoldersDeletionListener(indexFoldersDeletionListeners);
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -674,7 +678,8 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC
indicesFieldDataCache,
namedWriteableRegistry,
this::isIdFieldDataEnabled,
valuesSourceRegistry
valuesSourceRegistry,
indexFoldersDeletionListeners
);
}

Expand Down Expand Up @@ -918,7 +923,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index
logger.debug("{} deleting index store reason [{}]", index, reason);
if (predicate.apply(index, indexSettings)) {
// its safe to delete all index metadata and shard data
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings,
paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths));
}
success = true;
} catch (ShardLockObtainFailedException ex) {
Expand Down Expand Up @@ -947,7 +953,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index
public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException {
ShardId shardId = lock.getShardId();
logger.trace("{} deleting shard reason [{}]", shardId, reason);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings,
paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths));
}

/**
Expand All @@ -972,7 +979,8 @@ public void deleteShardStore(String reason, ShardId shardId, ClusterState cluste
if (shardDeletionCheckResult != ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE) {
throw new IllegalStateException("Can't delete shard " + shardId + " (cause: " + shardDeletionCheckResult + ")");
}
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings,
paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths));
logger.debug("{} deleted shard reason [{}]", shardId, reason);

if (canDeleteIndexContents(shardId.getIndex(), indexSettings)) {
Expand Down Expand Up @@ -1210,14 +1218,16 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time
assert delete.shardId == -1;
logger.debug("{} deleting index store reason [{}]", index, "pending delete");
try {
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings);
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings,
paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths));
iterator.remove();
} catch (IOException ex) {
logger.debug(() -> new ParameterizedMessage("{} retry pending delete", index), ex);
}
} else {
assert delete.shardId != -1;
ShardLock shardLock = locks.get(new ShardId(delete.index, delete.shardId));
final ShardId shardId = new ShardId(delete.index, delete.shardId);
final ShardLock shardLock = locks.get(shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
Expand Down
Loading