From a4e2230ebd5d424a78c266dc98f0521f897e580b Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 25 Nov 2014 13:58:31 +0100 Subject: [PATCH] Add index.data_path setting This allows specifying the path an index will be at. `index.data_path` is specified in the settings when creating an index, and can not be dynamically changed. An example request would look like: POST /myindex { "settings": { "number_of_shards": 2, "data_path": "/tmp/myindex" } } And would put data in /tmp/myindex/0/index/0 and /tmp/myindex/0/index/1 Since this can be used to write data to arbitrary locations on disk, it requires enabling the `node.enable_custom_paths` setting in elasticsearch.yml on all nodes. Relates to #8976 --- .../indices/create/CreateIndexRequest.java | 2 +- .../cluster/metadata/IndexMetaData.java | 1 + .../metadata/MetaDataCreateIndexService.java | 17 +- .../elasticsearch/env/NodeEnvironment.java | 171 +++++++++++++++--- .../org/elasticsearch/gateway/Gateway.java | 6 +- .../gateway/GatewayMetaState.java | 29 +-- .../gateway/GatewayShardsState.java | 36 ++-- .../gateway/MetaDataStateFormat.java | 14 +- .../elasticsearch/index/store/IndexStore.java | 16 +- .../store/support/AbstractIndexStore.java | 38 +++- .../index/translog/fs/FsTranslog.java | 18 +- .../elasticsearch/indices/IndicesService.java | 2 + .../indices/store/IndicesStore.java | 15 +- .../allocation/ClusterRerouteTests.java | 2 +- .../env/NodeEnvironmentTests.java | 94 +++++++++- .../gateway/GatewayShardStateTests.java | 8 +- .../indices/IndicesCustomDataPathTests.java | 159 ++++++++++++++++ .../store/IndicesStoreIntegrationTests.java | 1 - .../test/ElasticsearchIntegrationTest.java | 7 + .../test/InternalTestCluster.java | 1 + 20 files changed, 546 insertions(+), 91 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index 432891622d424..bd1aaba4a4605 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -48,9 +48,9 @@ import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream; import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; /** * A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}. diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 286641c664872..1e1cc5cb23286 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -167,6 +167,7 @@ public static State fromString(String state) { public static final String SETTING_UUID = "index.uuid"; public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type"; public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type"; + public static final String SETTING_DATA_PATH = "index.data_path"; public static final String INDEX_UUID_NA_VALUE = "_na_"; // hard-coded hash function as of 2.0 diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index cc89db535fac9..4edbe17f16bb1 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -26,6 +26,7 @@ import com.google.common.collect.Maps; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; @@ -57,6 +58,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperParsingException; @@ -64,6 +66,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.river.RiverIndexName; @@ -101,11 +104,13 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final String riverIndexName; private final AliasValidator aliasValidator; private final IndexTemplateFilter indexTemplateFilter; + private final NodeEnvironment nodeEnv; @Inject - public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, - AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName, - AliasValidator aliasValidator, Set indexTemplateFilters) { + public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, + IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService, + Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator, + Set indexTemplateFilters, NodeEnvironment nodeEnv) { super(settings); this.environment = environment; this.threadPool = threadPool; @@ -116,6 +121,7 @@ public MetaDataCreateIndexService(Settings settings, Environment environment, Th this.version = version; this.riverIndexName = riverIndexName; this.aliasValidator = aliasValidator; + this.nodeEnv = nodeEnv; if (indexTemplateFilters.isEmpty()) { this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER; @@ -554,6 +560,11 @@ public int compare(IndexTemplateMetaData o1, IndexTemplateMetaData o2) { private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException { validateIndexName(request.index(), state); + String customPath = request.settings().get(IndexMetaData.SETTING_DATA_PATH, null); + if (customPath != null && nodeEnv.isCustomPathsEnabled() == false) { + throw new IndexCreationException(new Index(request.index()), + new ElasticsearchIllegalArgumentException("custom data_paths for indices is disabled")); + } } private static class DefaultIndexTemplateFilter implements IndexTemplateFilter { diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 237ead4fc5615..3697127d85aad 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -24,14 +24,19 @@ import com.google.common.primitives.Ints; import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import java.io.Closeable; @@ -52,15 +57,30 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{ /* ${data.paths}/nodes/{node.id}/indices */ private final Path[] nodeIndicesPaths; private final Lock[] locks; + private final boolean addNodeId; private final int localNodeId; private final AtomicBoolean closed = new AtomicBoolean(false); private final Map shardLocks = new HashMap<>(); + private final boolean customPathsEnabled; + + // Setting to automatically append node id to custom data paths + public static final String ADD_NODE_ID_TO_CUSTOM_PATH = "node.add_id_to_custom_path"; + // Setting to enable custom index.data_path setting for new indices + public static final String SETTING_CUSTOM_DATA_PATH_ENABLED = "node.enable_custom_paths"; + + public static final String NODES_FOLDER = "nodes"; + public static final String INDICES_FOLDER = "indices"; + public static final String NODE_LOCK_FILENAME = "node.lock"; + @Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException { super(settings); + this.addNodeId = settings.getAsBoolean(ADD_NODE_ID_TO_CUSTOM_PATH, true); + this.customPathsEnabled = settings.getAsBoolean(SETTING_CUSTOM_DATA_PATH_ENABLED, false); + if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { nodePaths = null; nodeIndicesPaths = null; @@ -76,14 +96,14 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50); for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) { for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) { - Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(Paths.get("nodes", Integer.toString(possibleLockId))); + Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(Paths.get(NODES_FOLDER, Integer.toString(possibleLockId))); if (Files.exists(dir) == false) { Files.createDirectories(dir); } try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) { logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath()); - Lock tmpLock = luceneDir.makeLock("node.lock"); + Lock tmpLock = luceneDir.makeLock(NODE_LOCK_FILENAME); boolean obtained = tmpLock.obtain(); if (obtained) { locks[dirIndex] = tmpLock; @@ -117,7 +137,8 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } } if (locks[0] == null) { - throw new ElasticsearchIllegalStateException("Failed to obtain node lock, is the following location writable?: " + Arrays.toString(environment.dataWithClusterFiles()), lastException); + throw new ElasticsearchIllegalStateException("Failed to obtain node lock, is the following location writable?: " + + Arrays.toString(environment.dataWithClusterFiles()), lastException); } this.localNodeId = localNodeId; @@ -131,14 +152,20 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder("node data locations details:\n"); for (Path file : nodePaths) { - sb.append(" -> ").append(file.toAbsolutePath()).append(", free_space [").append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace())).append("], usable_space [").append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace())).append("]\n"); + sb.append(" -> ") + .append(file.toAbsolutePath()) + .append(", free_space [") + .append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace())) + .append("], usable_space [") + .append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace())) + .append("]\n"); } logger.trace(sb.toString()); } this.nodeIndicesPaths = new Path[nodePaths.length]; for (int i = 0; i < nodePaths.length; i++) { - nodeIndicesPaths[i] = nodePaths[i].resolve("indices"); + nodeIndicesPaths[i] = nodePaths[i].resolve(INDICES_FOLDER); } } @@ -150,10 +177,20 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce * @param shardId the id of the shard to delete to delete * @throws IOException if an IOException occurs */ - public void deleteShardDirectorySafe(ShardId shardId) throws IOException { + public void deleteShardDirectorySafe(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException { + // This is to ensure someone doesn't use ImmutableSettings.EMPTY + assert indexSettings != ImmutableSettings.EMPTY; final Path[] paths = shardPaths(shardId); + logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths); try (Closeable lock = shardLock(shardId)) { IOUtils.rm(paths); + if (hasCustomDataPath(indexSettings)) { + Path customLocation = resolveCustomLocation(indexSettings, shardId); + logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); + IOUtils.rm(customLocation); + } + logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths); + assert FileSystemUtils.exists(paths) == false; } } @@ -166,21 +203,25 @@ public void deleteShardDirectorySafe(ShardId shardId) throws IOException { * @param lockTimeoutMS how long to wait for acquiring the indices shard locks * @throws Exception if any of the shards data directories can't be locked or deleted */ - public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOException { + public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException { + // This is to ensure someone doesn't use ImmutableSettings.EMPTY + assert indexSettings != ImmutableSettings.EMPTY; final List locks = lockAllForIndex(index, lockTimeoutMS); try { - final Path[] indexPaths = new Path[nodeIndicesPaths.length]; - for (int i = 0; i < indexPaths.length; i++) { - indexPaths[i] = nodeIndicesPaths[i].resolve(index.name()); - } + final Path[] indexPaths = indexPaths(index); + logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); IOUtils.rm(indexPaths); + if (hasCustomDataPath(indexSettings)) { + Path customLocation = resolveCustomLocation(indexSettings, index.name()); + logger.trace("deleting custom index {} directory [{}]", index, customLocation); + IOUtils.rm(customLocation); + } } finally { IOUtils.closeWhileHandlingException(locks); } } - /** * Tries to lock all local shards for the given index. If any of the shard locks can't be acquired * an {@link LockObtainFailedException} is thrown and all previously acquired locks are released. @@ -192,7 +233,8 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOE */ public List lockAllForIndex(Index index, long lockTimeoutMS) throws IOException { Set allShardIds = findAllShardIds(index); - List allLocks = new ArrayList<>(); + logger.trace("locking all shards for index {} - [{}]", index, allShardIds); + List allLocks = new ArrayList<>(allShardIds.size()); boolean success = false; long startTime = System.currentTimeMillis(); try { @@ -203,6 +245,7 @@ public List lockAllForIndex(Index index, long lockTimeoutMS) throws I success = true; } finally { if (success == false) { + logger.trace("unable to lock all shards for index {}", index); IOUtils.closeWhileHandlingException(allLocks); } } @@ -236,6 +279,7 @@ public ShardLock shardLock(ShardId id) throws IOException { * @throws IOException if an IOException occurs. */ public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOException { + logger.trace("acquiring node shardlock on [{}], timeout [{}]", id, lockTimeoutMS); final InternalShardLock shardLock; final boolean acquired; synchronized (shardLocks) { @@ -260,10 +304,12 @@ public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOExcept } } } + logger.trace("successfully acquired shardlock for [{}]", id); return new ShardLock(id) { // new instance prevents double closing @Override protected void closeInternal() { shardLock.release(); + logger.trace("released shard lock for [{}]", id); } }; } @@ -309,7 +355,10 @@ void incWaitCount() { private void decWaitCount() { synchronized (shardLocks) { assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0"; - if (--waitCount == 0) { + --waitCount; + logger.trace("shard lock wait count for [{}] is now [{}]", shardId, waitCount); + if (waitCount == 0) { + logger.trace("last shard lock wait decremented, removing lock for [{}]", shardId); InternalShardLock remove = shardLocks.remove(shardId); assert remove != null : "Removed lock was null"; } @@ -349,7 +398,7 @@ public Path[] nodeDataPaths() { } /** - * Returns all data paths for the given index. + * Returns all data paths excluding custom index paths. */ public Path[] indexPaths(Index index) { assert assertEnvIsLocked(); @@ -361,14 +410,30 @@ public Path[] indexPaths(Index index) { } /** - * Returns all data paths for the given shards ID + * Returns all paths where lucene data will be stored, if a index.data_path + * setting is present, will return the custom data path to be used + */ + public Path[] shardDataPaths(ShardId shardId, @IndexSettings Settings indexSettings) { + assert indexSettings != ImmutableSettings.EMPTY; + assert assertEnvIsLocked(); + if (hasCustomDataPath(indexSettings)) { + return new Path[] {resolveCustomLocation(indexSettings, shardId)}; + } else { + return shardPaths(shardId); + } + } + + /** + * Returns all shard paths excluding custom shard path */ public Path[] shardPaths(ShardId shardId) { assert assertEnvIsLocked(); final Path[] nodePaths = nodeDataPaths(); final Path[] shardLocations = new Path[nodePaths.length]; for (int i = 0; i < nodePaths.length; i++) { - shardLocations[i] = nodePaths[i].resolve(Paths.get("indices", shardId.index().name(), Integer.toString(shardId.id()))); + shardLocations[i] = nodePaths[i].resolve(Paths.get(INDICES_FOLDER, + shardId.index().name(), + Integer.toString(shardId.id()))); } return shardLocations; } @@ -395,14 +460,14 @@ public Set findAllIndices() throws Exception { } /** - * Tries to find all allocated shards for the given index or for all indices iff the given index is null + * Tries to find all allocated shards for the given index or for all indices iff the given index is {@code null} * on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not * see directories created concurrently or while it's traversing. - * @param index the index to filter shards for or null if all shards for all indices should be listed + * @param index the index to filter shards for or {@code null} if all shards for all indices should be listed * @return a set of shard IDs * @throws IOException if an IOException occurs */ - public Set findAllShardIds(@Nullable final Index index) throws IOException { + public Set findAllShardIds(final Index index) throws IOException { if (nodePaths == null || locks == null) { throw new ElasticsearchIllegalStateException("node is not configured to store local location"); } @@ -435,7 +500,8 @@ private static Set findAllShardsForIndex(Path indexPath) throws IOExcep if (Files.exists(shardPath) && Files.isDirectory(shardPath)) { Integer shardId = Ints.tryParse(shardPath.getFileName().toString()); if (shardId != null) { - shardIds.add(new ShardId(currentIndex, shardId)); + ShardId id = new ShardId(currentIndex, shardId); + shardIds.add(id); } } } @@ -500,7 +566,9 @@ public void ensureAtomicMoveSupported() throws IOException { try { Files.move(src, target, StandardCopyOption.ATOMIC_MOVE); } catch (AtomicMoveNotSupportedException ex) { - throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + directory + "] atomic_move is required for elasticsearch to work correctly.", ex); + throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + + directory + + "] atomic_move is required for elasticsearch to work correctly.", ex); } finally { Files.deleteIfExists(src); Files.deleteIfExists(target); @@ -512,4 +580,63 @@ Settings getSettings() { // for testing return settings; } + /** return true if custom paths are allowed for indices */ + public boolean isCustomPathsEnabled() { + return customPathsEnabled; + } + + /** + * @param indexSettings settings for an index + * @return true if the index has a custom data path + */ + public static boolean hasCustomDataPath(@IndexSettings Settings indexSettings) { + return indexSettings.get(IndexMetaData.SETTING_DATA_PATH) != null; + } + + /** + * Resolve the custom path for a index's shard. + * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine + * the root path for the index. + * + * @param indexSettings settings for the index + */ + private Path resolveCustomLocation(@IndexSettings Settings indexSettings) { + assert indexSettings != ImmutableSettings.EMPTY; + String customDataDir = indexSettings.get(IndexMetaData.SETTING_DATA_PATH); + if (customDataDir != null) { + // This assert is because this should be caught by MetaDataCreateIndexService + assert customPathsEnabled; + if (addNodeId) { + return Paths.get(customDataDir, Integer.toString(this.localNodeId)); + } else { + return Paths.get(customDataDir); + } + } else { + throw new ElasticsearchIllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available"); + } + } + + /** + * Resolve the custom path for a index's shard. + * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine + * the root path for the index. + * + * @param indexSettings settings for the index + * @param indexName index to resolve the path for + */ + private Path resolveCustomLocation(@IndexSettings Settings indexSettings, final String indexName) { + return resolveCustomLocation(indexSettings).resolve(indexName); + } + + /** + * Resolve the custom path for a index's shard. + * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine + * the root path for the index. + * + * @param indexSettings settings for the index + * @param shardId shard to resolve the path to + */ + public Path resolveCustomLocation(@IndexSettings Settings indexSettings, final ShardId shardId) { + return resolveCustomLocation(indexSettings, shardId.index().name()).resolve(Integer.toString(shardId.id())); + } } diff --git a/src/main/java/org/elasticsearch/gateway/Gateway.java b/src/main/java/org/elasticsearch/gateway/Gateway.java index 8020ae8de245a..4811c8bb6bf43 100644 --- a/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -32,6 +32,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; +import java.nio.file.Path; + /** * */ @@ -165,7 +167,9 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t public void reset() throws Exception { try { - IOUtils.rm(nodeEnv.nodeDataPaths()); + Path[] dataPaths = nodeEnv.nodeDataPaths(); + logger.trace("removing node data paths: [{}]", dataPaths); + IOUtils.rm(dataPaths); } catch (Exception ex) { logger.debug("failed to delete shard locations", ex); } diff --git a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 90d3c287253f4..016e57756c060 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -50,12 +50,11 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; +import java.nio.file.*; import java.util.List; import java.util.Map; import java.util.Set; @@ -270,7 +269,7 @@ public void clusterChanged(ClusterChangedEvent event) { // it may take a couple of seconds for outstanding shard reference // to release their refs (for example, on going recoveries) // we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608 - nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis()); + nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis(), current.settings()); } catch (LockObtainFailedException ex) { logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index()); } catch (Exception ex) { @@ -321,7 +320,7 @@ public void clusterChanged(ClusterChangedEvent event) { final List shardLocks = nodeEnv.lockAllForIndex(index, 0); if (shardLocks.isEmpty()) { // no shards - try to remove the directory - nodeEnv.deleteIndexDirectorySafe(index, 0); + nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings()); continue; } IOUtils.closeWhileHandlingException(shardLocks); @@ -335,7 +334,7 @@ public void clusterChanged(ClusterChangedEvent event) { } else if (danglingTimeout.millis() == 0) { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); try { - nodeEnv.deleteIndexDirectorySafe(index, 0); + nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings()); } catch (LockObtainFailedException ex) { logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName); } catch (Exception ex) { @@ -343,7 +342,11 @@ public void clusterChanged(ClusterChangedEvent event) { } } else { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); - danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(index)))); + danglingIndices.put(indexName, + new DanglingIndex(indexName, + threadPool.schedule(danglingTimeout, + ThreadPool.Names.SAME, + new RemoveDanglingIndex(index, indexMetaData.settings())))); } } } @@ -441,7 +444,8 @@ private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable In final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version(); final MetaDataStateFormat writer = indexStateFormat(format, formatParams, deleteOldFiles); try { - writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(), nodeEnv.indexPaths(new Index(indexMetaData.index()))); + writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(), + nodeEnv.indexPaths(new Index(indexMetaData.index()))); } catch (Throwable ex) { logger.warn("[{}]: failed to write index state", ex, indexMetaData.index()); throw new IOException("failed to write state for [" + indexMetaData.index() + "]", ex); @@ -482,7 +486,8 @@ private MetaData loadState() throws Exception { @Nullable private IndexMetaData loadIndexState(String index) throws IOException { - return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true), INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index))); + return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true), + INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index))); } private MetaData loadGlobalState() throws IOException { @@ -553,9 +558,11 @@ private void pre20Upgrade() throws Exception { class RemoveDanglingIndex implements Runnable { private final Index index; + private final Settings indexSettings; - RemoveDanglingIndex(Index index) { + RemoveDanglingIndex(Index index, @IndexSettings Settings indexSettings) { this.index = index; + this.indexSettings = indexSettings; } @Override @@ -570,7 +577,7 @@ public void run() { try { MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index)); - nodeEnv.deleteIndexDirectorySafe(index, 0); + nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); } catch (Exception ex) { logger.debug("failed to delete dangling index", ex); } diff --git a/src/main/java/org/elasticsearch/gateway/GatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/GatewayShardsState.java index f5a6b5409d7db..06ac59a2138d2 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayShardsState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayShardsState.java @@ -26,18 +26,26 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; -import java.io.*; -import java.nio.file.*; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -56,7 +64,8 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat private volatile Map currentState = Maps.newHashMap(); @Inject - public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception { + public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv, + TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception { super(settings); this.nodeEnv = nodeEnv; if (listGatewayStartedShards != null) { // for testing @@ -125,7 +134,7 @@ Map persistRoutingNodeState(RoutingNode routingNode) { ShardId shardId = shardRouting.shardId(); ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary()); final ShardStateInfo previous = currentState.get(shardId); - if(maybeWriteShardState(shardId, shardStateInfo, previous) ) { + if (maybeWriteShardState(shardId, shardStateInfo, previous)) { newState.put(shardId, shardStateInfo); } else if (previous != null) { currentState.put(shardId, previous); @@ -146,8 +155,10 @@ boolean maybeWriteShardState(ShardId shardId, ShardStateInfo shardStateInfo, Sha } else if (previousState.version < shardStateInfo.version) { writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]"; } else { - logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]"); - assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]"; + logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + + previousState.version + "] current version [" + shardStateInfo.version + "]"); + assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]"; return previousState.version == shardStateInfo.version; } @@ -182,13 +193,16 @@ private Map loadShardsStateInfo() throws Exception { } private ShardStateInfo loadShardStateInfo(ShardId shardId) throws IOException { - return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN, shardId.toString(), nodeEnv.shardPaths(shardId)); + return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN, + shardId.toString(), nodeEnv.shardPaths(shardId)); } - private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception { + private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, + @Nullable ShardStateInfo previousStateInfo) throws Exception { logger.trace("{} writing shard state, reason [{}]", shardId, reason); final boolean deleteOldFiles = previousStateInfo != null && previousStateInfo.version != shardStateInfo.version; - newShardStateInfoFormat(deleteOldFiles).write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version, nodeEnv.shardPaths(shardId)); + MetaDataStateFormat stateFormat = newShardStateInfoFormat(deleteOldFiles); + stateFormat.write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version, nodeEnv.shardPaths(shardId)); } private MetaDataStateFormat newShardStateInfoFormat(boolean deleteOldFiles) { diff --git a/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index c57cf5eb5429a..83a0440a6c4fc 100644 --- a/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -31,14 +31,14 @@ import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.*; -import java.io.*; -import java.nio.file.*; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; diff --git a/src/main/java/org/elasticsearch/index/store/IndexStore.java b/src/main/java/org/elasticsearch/index/store/IndexStore.java index 5231c76dc9240..d979075694be5 100644 --- a/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.StoreRateLimiting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import java.io.Closeable; @@ -44,14 +46,22 @@ public interface IndexStore extends Closeable { /** * Returns true if this shard is allocated on this node. Allocated means - * that it has storage files that can be deleted using {@link #deleteUnallocated(org.elasticsearch.index.shard.ShardId)}. + * that it has storage files that can be deleted using {@code deleteUnallocated(ShardId, Settings)}. */ - boolean canDeleteUnallocated(ShardId shardId); + boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings); /** * Deletes this shard store since its no longer allocated. */ - void deleteUnallocated(ShardId shardId) throws IOException; + void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException; + /** + * Return an array of all index folder locations for a given shard + */ Path[] shardIndexLocations(ShardId shardId); + + /** + * Return an array of all translog folder locations for a given shard + */ + Path[] shardTranslogLocations(ShardId shardId); } diff --git a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java index 689c6e6d9d247..f829885d8a472 100644 --- a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java @@ -46,6 +46,9 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type"; public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec"; + public static final String INDEX_FOLDER_NAME = "index"; + public static final String TRANSLOG_FOLDER_NAME = "translog"; + class ApplySettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -125,7 +128,7 @@ public StoreRateLimiting rateLimiting() { @Override - public boolean canDeleteUnallocated(ShardId shardId) { + public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) { if (locations == null) { return false; } @@ -136,7 +139,7 @@ public boolean canDeleteUnallocated(ShardId shardId) { } @Override - public void deleteUnallocated(ShardId shardId) throws IOException { + public void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException { if (locations == null) { return; } @@ -144,18 +147,39 @@ public void deleteUnallocated(ShardId shardId) throws IOException { throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted"); } try { - nodeEnv.deleteShardDirectorySafe(shardId); + nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); } catch (Exception ex) { logger.debug("failed to delete shard locations", ex); } } + /** + * Return an array of all index folder locations for a given shard. Uses + * the index settings to determine if a custom data path is set for the + * index and uses that if applicable. + */ public Path[] shardIndexLocations(ShardId shardId) { - Path[] shardLocations = nodeEnv.shardPaths(shardId); - Path[] shardIndexLocations = new Path[shardLocations.length]; + Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings); + Path[] locations = new Path[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + locations[i] = shardLocations[i].resolve(INDEX_FOLDER_NAME); + } + logger.debug("using [{}] as shard's index location", locations); + return locations; + } + + /** + * Return an array of all translog folder locations for a given shard. Uses + * the index settings to determine if a custom data path is set for the + * index and uses that if applicable. + */ + public Path[] shardTranslogLocations(ShardId shardId) { + Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings); + Path[] locations = new Path[shardLocations.length]; for (int i = 0; i < shardLocations.length; i++) { - shardIndexLocations[i] = shardLocations[i].resolve("index"); + locations[i] = shardLocations[i].resolve(TRANSLOG_FOLDER_NAME); } - return shardIndexLocations; + logger.debug("using [{}] as shard's translog location", locations); + return locations; } } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 9cb1590157752..5467e0e4f0b52 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -31,12 +31,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.*; +import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.index.translog.TranslogStreams; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -89,15 +92,14 @@ public void onRefreshSettings(Settings settings) { @Inject - public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv, BigArrays bigArrays) throws IOException { + public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, + BigArrays bigArrays, IndexStore indexStore) throws IOException { super(shardId, indexSettings); this.indexSettingsService = indexSettingsService; this.bigArrays = bigArrays; - Path[] shardLocations = nodeEnv.shardPaths(shardId); - this.locations = new Path[shardLocations.length]; - for (int i = 0; i < shardLocations.length; i++) { - locations[i] = shardLocations[i].resolve("translog"); - Files.createDirectories(locations[i]); + this.locations = indexStore.shardTranslogLocations(shardId); + for (Path location : locations) { + Files.createDirectories(location); } this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name())); diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index 792d5495d06d3..b8f336098072d 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; @@ -68,6 +69,7 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index fbd4c037ca424..1496799aa1265 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -20,16 +20,15 @@ package org.elasticsearch.indices.store; import org.apache.lucene.store.StoreRateLimiting; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.StreamInput; @@ -40,9 +39,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; @@ -171,7 +170,7 @@ public void clusterChanged(ClusterChangedEvent event) { } } else { if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId)) { + if (indexService.store().canDeleteUnallocated(shardId, indexService.settingsService().getSettings())) { deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); } } @@ -229,6 +228,7 @@ boolean shardCanBeDeleted(ClusterState state, IndexShardRoutingTable indexShardR return true; } + // TODO will have to ammend this for shadow replicas so we don't delete the shared copy... private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) { List> requests = new ArrayList<>(indexShardRoutingTable.size()); String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getUUID(); @@ -320,6 +320,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { } IndexService indexService = indicesService.indexService(shardId.getIndex()); + IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex()); if (indexService == null) { // not physical allocation of the index, delete it from the file system if applicable if (nodeEnv.hasNodeFile()) { @@ -327,7 +328,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { if (FileSystemUtils.exists(shardLocations)) { logger.debug("{} deleting shard that is no longer used", shardId); try { - nodeEnv.deleteShardDirectorySafe(shardId); + nodeEnv.deleteShardDirectorySafe(shardId, indexMeta.settings()); } catch (Exception ex) { logger.debug("failed to delete shard locations", ex); } @@ -335,10 +336,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { } } else { if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId)) { + if (indexService.store().canDeleteUnallocated(shardId, indexMeta.settings())) { logger.debug("{} deleting shard that is no longer used", shardId); try { - indexService.store().deleteUnallocated(shardId); + indexService.store().deleteUnallocated(shardId, indexMeta.settings()); } catch (Exception e) { logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId); } diff --git a/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java b/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java index 64744f9fef6ac..0e5b4a205ac5b 100644 --- a/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java +++ b/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java @@ -48,7 +48,7 @@ import java.util.List; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.hamcrest.Matchers.equalTo; /** diff --git a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index c18950ce8c678..b1a9438a2265f 100644 --- a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -40,8 +41,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.CoreMatchers.equalTo; + public class NodeEnvironmentTests extends ElasticsearchTestCase { + private final Settings idxSettings = ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).build(); + @Test public void testNodeLockSingleEnvironment() throws IOException { NodeEnvironment env = newNodeEnvironment(ImmutableSettings.builder() @@ -154,7 +160,7 @@ public void testDeleteSafe() throws IOException { } try { - env.deleteShardDirectorySafe(new ShardId("foo", 1)); + env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings); fail("shard is locked"); } catch (LockObtainFailedException ex) { // expected @@ -166,7 +172,7 @@ public void testDeleteSafe() throws IOException { } - env.deleteShardDirectorySafe(new ShardId("foo", 2)); + env.deleteShardDirectorySafe(new ShardId("foo", 2), idxSettings); for (Path path : env.indexPaths(new Index("foo"))) { assertTrue(Files.exists(path.resolve("1"))); @@ -174,7 +180,7 @@ public void testDeleteSafe() throws IOException { } try { - env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10)); + env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10), idxSettings); fail("shard is locked"); } catch (LockObtainFailedException ex) { // expected @@ -204,7 +210,7 @@ protected void doRun() throws Exception { t.start(); } - env.deleteIndexDirectorySafe(new Index("foo"), 5000); + env.deleteIndexDirectorySafe(new Index("foo"), 5000, idxSettings); assertNull(threadException.get()); @@ -306,4 +312,84 @@ public void run() { } env.close(); } + + @Test + public void testCustomDataPaths() throws Exception { + String[] dataPaths = tmpPaths(); + NodeEnvironment env = newNodeEnvironment(dataPaths, ImmutableSettings.EMPTY); + + Settings s1 = ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build(); + Settings s2 = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build(); + ShardId sid = new ShardId("myindex", 0); + Index i = new Index("myindex"); + + assertFalse("no settings should mean no custom data path", NodeEnvironment.hasCustomDataPath(s1)); + assertTrue("settings with path_data should have a custom data path", NodeEnvironment.hasCustomDataPath(s2)); + + assertThat(env.shardDataPaths(sid, s1), equalTo(env.shardPaths(sid))); + assertThat(env.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/0/myindex/0")})); + + assertThat("shard paths with a custom data_path should contain only regular paths", + env.shardPaths(sid), + equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0"))); + + assertThat("index paths uses the regular template", + env.indexPaths(i), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex"))); + + env.close(); + NodeEnvironment env2 = newNodeEnvironment(dataPaths, + ImmutableSettings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build()); + + assertThat(env2.shardDataPaths(sid, s1), equalTo(env2.shardPaths(sid))); + assertThat(env2.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/myindex/0")})); + + assertThat("shard paths with a custom data_path should contain only regular paths", + env2.shardPaths(sid), + equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0"))); + + assertThat("index paths uses the regular template", + env2.indexPaths(i), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex"))); + + env2.close(); + } + + /** Converts an array of Strings to an array of Paths, adding an additional child if specified */ + private Path[] stringsToPaths(String[] strings, String additional) { + Path[] locations = new Path[strings.length]; + for (int i = 0; i < strings.length; i++) { + locations[i] = Paths.get(strings[i], additional); + } + return locations; + } + + public String[] tmpPaths() { + final int numPaths = randomIntBetween(1, 3); + final String[] absPaths = new String[numPaths]; + for (int i = 0; i < numPaths; i++) { + absPaths[i] = newTempDirPath().toAbsolutePath().toString(); + } + return absPaths; + } + + public NodeEnvironment newNodeEnvironment() throws IOException { + return newNodeEnvironment(ImmutableSettings.EMPTY); + } + + public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException { + Settings build = ImmutableSettings.builder() + .put(settings) + .put("path.home", newTempDirPath().toAbsolutePath().toString()) + .put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true) + .putArray("path.data", tmpPaths()).build(); + return new NodeEnvironment(build, new Environment(build)); + } + + public NodeEnvironment newNodeEnvironment(String[] dataPaths, Settings settings) throws IOException { + Settings build = ImmutableSettings.builder() + .put(settings) + .put("path.home", newTempDirPath().toAbsolutePath().toString()) + .put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true) + .putArray("path.data", dataPaths).build(); + return new NodeEnvironment(build, new Environment(build)); + } } diff --git a/src/test/java/org/elasticsearch/gateway/GatewayShardStateTests.java b/src/test/java/org/elasticsearch/gateway/GatewayShardStateTests.java index 70cd0d41cc3a8..44a4c4dc57aa0 100644 --- a/src/test/java/org/elasticsearch/gateway/GatewayShardStateTests.java +++ b/src/test/java/org/elasticsearch/gateway/GatewayShardStateTests.java @@ -20,15 +20,15 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchTestCase; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class GatewayShardStateTests extends ElasticsearchTestCase { diff --git a/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java b/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java new file mode 100644 index 0000000000000..9e981d6ce5238 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.*; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for custom data path locations and templates + */ +public class IndicesCustomDataPathTests extends ElasticsearchIntegrationTest { + + private String path; + + @Before + public void setup() { + path = newTempDirPath().toAbsolutePath().toString(); + } + + @After + public void teardown() throws Exception { + IOUtils.deleteFilesIgnoringExceptions(Paths.get(path)); + } + + @Test + public void testDataPathCanBeChanged() throws Exception { + final String INDEX = "idx"; + Path root = newTempDirPath(); + Path startDir = root.resolve("start"); + Path endDir = root.resolve("end"); + logger.info("--> start dir: [{}]", startDir.toAbsolutePath().toString()); + logger.info("--> end dir: [{}]", endDir.toAbsolutePath().toString()); + // temp dirs are automatically created, but the end dir is what + // startDir is going to be renamed as, so it needs to be deleted + // otherwise we get all sorts of errors about the directory + // already existing + IOUtils.rm(endDir); + + ImmutableSettings.Builder sb = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, + startDir.toAbsolutePath().toString()); + ImmutableSettings.Builder sb2 = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, + endDir.toAbsolutePath().toString()); + + logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString()); + client().admin().indices().prepareCreate(INDEX).setSettings(sb).get(); + ensureGreen(INDEX); + + indexRandom(true, client().prepareIndex(INDEX, "doc", "1").setSource("{\"body\": \"foo\"}")); + + SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + logger.info("--> closing the index [{}]", INDEX); + client().admin().indices().prepareClose(INDEX).get(); + logger.info("--> index closed, re-opening..."); + client().admin().indices().prepareOpen(INDEX).get(); + logger.info("--> index re-opened"); + ensureGreen(INDEX); + + resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + // Now, try closing and changing the settings + + logger.info("--> closing the index [{}]", INDEX); + client().admin().indices().prepareClose(INDEX).get(); + + logger.info("--> moving data on disk [{}] to [{}]", startDir.getFileName(), endDir.getFileName()); + assert Files.exists(endDir) == false : "end directory should not exist!"; + Files.move(startDir, endDir, StandardCopyOption.REPLACE_EXISTING); + + logger.info("--> updating settings..."); + client().admin().indices().prepareUpdateSettings(INDEX) + .setSettings(sb2) + .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) + .get(); + + assert Files.exists(startDir) == false : "start dir shouldn't exist"; + + logger.info("--> settings updated and files moved, re-opening index"); + client().admin().indices().prepareOpen(INDEX).get(); + logger.info("--> index re-opened"); + ensureGreen(INDEX); + + resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + assertAcked(client().admin().indices().prepareDelete(INDEX)); + assertPathHasBeenCleared(startDir.toAbsolutePath().toString()); + assertPathHasBeenCleared(endDir.toAbsolutePath().toString()); + } + + @Test + public void testIndexCreatedWithCustomPathAndTemplate() throws Exception { + final String INDEX = "myindex2"; + + logger.info("--> creating an index with data_path [{}]", path); + ImmutableSettings.Builder sb = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, path); + + client().admin().indices().prepareCreate(INDEX).setSettings(sb).get(); + ensureGreen(INDEX); + + indexRandom(true, client().prepareIndex(INDEX, "doc", "1").setSource("{\"body\": \"foo\"}")); + + SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + assertAcked(client().admin().indices().prepareDelete(INDEX)); + assertPathHasBeenCleared(path); + } + + private void assertPathHasBeenCleared(String path) throws Exception { + int count = 0; + StringBuilder sb = new StringBuilder(); + sb.append("["); + if (Files.exists(Paths.get(path))) { + try (DirectoryStream stream = Files.newDirectoryStream(Paths.get(path))) { + for (Path file : stream) { + if (Files.isRegularFile(file)) { + count++; + sb.append(file.toAbsolutePath().toString()); + sb.append("\n"); + } + } + } + } + sb.append("]"); + assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); + } +} diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 0ee1499f5a55c..21dd3ed64de14 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.nio.file.Files; diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 2acb7ebd4cecd..78887457be7bd 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -61,6 +61,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -739,6 +740,12 @@ public Settings indexSettings() { if (numberOfReplicas >= 0) { builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); } + // 30% of the time + if (randomInt(9) < 3) { + String dataPath = "data/custom-" + CHILD_JVM_ID + "/" + UUID.randomUUID().toString(); + logger.info("using custom data_path for index: [{}]", dataPath); + builder.put(IndexMetaData.SETTING_DATA_PATH, dataPath); + } return builder.build(); } diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 16fb22cabb073..8223e170c48f4 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -276,6 +276,7 @@ public InternalTestCluster(long clusterSeed, builder.put("script.disable_dynamic", false); builder.put("http.pipelining", enableHttpPipelining); builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false); + builder.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true); if (Strings.hasLength(System.getProperty("es.logger.level"))) { builder.put("logger.level", System.getProperty("es.logger.level")); }