diff --git a/docs/reference/indices.asciidoc b/docs/reference/indices.asciidoc index 11a73784919e5..616d24ffbbbbe 100644 --- a/docs/reference/indices.asciidoc +++ b/docs/reference/indices.asciidoc @@ -41,6 +41,11 @@ and warmers. * <> * <> +[float] +[[shadow-replicas]] +== Replica configurations +* <> + [float] [[monitoring]] == Monitoring: @@ -107,3 +112,4 @@ include::indices/optimize.asciidoc[] include::indices/upgrade.asciidoc[] +include::indices/shadow-replicas.asciidoc[] diff --git a/docs/reference/indices/shadow-replicas.asciidoc b/docs/reference/indices/shadow-replicas.asciidoc new file mode 100644 index 0000000000000..1f4db1c90cf20 --- /dev/null +++ b/docs/reference/indices/shadow-replicas.asciidoc @@ -0,0 +1,105 @@ +[[indices-shadow-replicas]] +== Shadow replica indices + +experimental[] + +If you would like to use a shared filesystem, you can use the shadow replicas +settings to choose where on disk the data for an index should be kept, as well +as how Elasticsearch should replay operations on all the replica shards of an +index. + +In order to fully utilize the `index.data_path` and `index.shadow_replicas` +settings, you need to enable using it in elasticsearch.yml: + +[source,yaml] +-------------------------------------------------- +node.enable_custom_paths: true +-------------------------------------------------- + +You can then create an index with a custom data path, where each node will use +this path for the data: + +[WARNING] +======================== +Because shadow replicas do not index the document on replica shards, it's +possible for the replica's known mapping to be behind the index's known mapping +if the latest cluster state has not yet been processed on the node containing +the replica. Because of this, it is highly recommended to use pre-defined +mappings when using shadow replicas. +======================== + +[source,js] +-------------------------------------------------- +curl -XPUT 'localhost:9200/my_index' -d ' +{ + "index" : { + "number_of_shards" : 1, + "number_of_replicas" : 4, + "data_path": "/var/data/my_index", + "shadow_replicas": true + } +}' +-------------------------------------------------- + +[WARNING] +======================== +In the above example, the "/var/data/my_index" path is a shared filesystem that +must be available on every node in the Elasticsearch cluster. You must also +ensure that the Elasticsearch process has the correct permissions to read from +and write to the directory used in the `index.data_path` setting. +======================== + +An index that has been created with the `index.shadow_replicas` setting set to +"true" will not replicate document operations to any of the replica shards, +instead, it will only continually refresh. Once segments are available on the +filesystem where the shadow replica resides (after an Elasticsearch "flush"), a +regular refresh (governed by the `index.refresh_interval`) can be used to make +the new data searchable. + +NOTE: Since documents are only indexed on the primary shard, realtime GET +requests could fail to return a document if executed on the replica shard, +therefore, GET API requests automatically have the `?preference=_primary` flag +set if there is no preference flag already set. + +In order to ensure the data is being synchronized in a fast enough manner, you +may need to tune the flush threshold for the index to a desired number. A flush +is needed to fsync segment files to disk, so they will be visible to all other +replica nodes. Users should test what flush threshold levels they are +comfortable with, as increased flushing can impact indexing performance. + +The Elasticsearch cluster will still detect the loss of a primary shard, and +transform the replica into a primary in this situation. This transformation will +take slightly longer, since no `IndexWriter` is maintained for each shadow +replica. + +Below is the list of settings that can be changed using the update +settings API: + +`index.data_path` (string):: + Path to use for the index's data. Note that by default Elasticsearch will + append the node ordinal by default to the path to ensure multiple instances + of Elasticsearch on the same machine do not share a data directory. + +`index.shadow_replicas`:: + Boolean value indicating this index should use shadow replicas. Defaults to + `false`. + +`index.shared_filesystem`:: + Boolean value indicating this index uses a shared filesystem. Defaults to + the `true` if `index.shadow_replicas` is set to true, `false` otherwise. + +=== Node level settings related to shadow replicas + +These are non-dynamic settings that need to be configured in `elasticsearch.yml` + +`node.add_id_to_custom_path`:: + Boolean setting indicating whether Elasticsearch should append the node's + ordinal to the custom data path. For example, if this is enabled and a path + of "/tmp/foo" is used, the first locally-running node will use "/tmp/foo/0", + the second will use "/tmp/foo/1", the third "/tmp/foo/2", etc. Defaults to + `true`. + +`node.enable_custom_paths`:: + Boolean value that must be set to `true` in order to use the + `index.data_path` setting. Defaults to `false`. + diff --git a/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 0da7d9a89c4f0..39626a18b076a 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -25,13 +25,15 @@ import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -74,6 +76,14 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { if (request.request().realtime == null) { request.request().realtime = this.realtime; } + IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex()); + if (request.request().realtime && // if the realtime flag is set + request.request().preference() == null && // the preference flag is not already set + indexMeta != null && // and we have the index + IndexMetaData.isIndexUsingShadowReplicas(indexMeta.settings())) { // and the index uses shadow replicas + // set the preference for the request to use "_primary" automatically + request.request().preference(Preference.PRIMARY.type()); + } // update the routing (request#index here is possibly an alias) request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index())); // Fail fast on the node that received the request. diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 43c2818dc9248..5aa1b7adf82e5 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -45,11 +45,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; @@ -631,9 +631,23 @@ void performOnReplica(final ReplicationState state, final ShardRouting shard, fi } final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest()); + + // If the replicas use shadow replicas, there is no reason to + // perform the action on the replica, so skip it and + // immediately return + if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { + // this delays mapping updates on replicas because they have + // to wait until they get the new mapping through the cluster + // state, which is why we recommend pre-defined mappings for + // indices using shadow replicas + state.onReplicaSuccess(); + return; + } + if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { final DiscoveryNode node = observer.observedState().nodes().get(nodeId); - transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(node, transportReplicaAction, shardRequest, + transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty vResponse) { state.onReplicaSuccess(); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 1d2e9646ae4b4..253b27ddc96df 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -157,6 +157,8 @@ public static State fromString(String state) { public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards"; public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas"; + public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas"; + public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem"; public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final String SETTING_READ_ONLY = "index.blocks.read_only"; public static final String SETTING_BLOCKS_READ = "index.blocks.read"; @@ -784,4 +786,25 @@ public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws } } } + + /** + * Returns true iff the given settings indicate that the index + * associated with these settings allocates it's shards on a shared + * filesystem. Otherwise false. The default setting for this + * is the returned value from + * {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}. + */ + public static boolean isOnSharedFilesystem(Settings settings) { + return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings)); + } + + /** + * Returns true iff the given settings indicate that the index associated + * with these settings uses shadow replicas. Otherwise false. The default + * setting for this is false. + */ + public static boolean isIndexUsingShadowReplicas(Settings settings) { + return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false); + } + } diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index ab8cb91a322db..0dbd9985a88ff 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -224,6 +224,7 @@ private boolean isShardLocked(ShardId id) { * * @param index the index to delete * @param lockTimeoutMS how long to wait for acquiring the indices shard locks + * @param indexSettings settings for the index being deleted * @throws Exception if any of the shards data directories can't be locked or deleted */ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException { diff --git a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 29dab15f78cb4..2700b5d3a0b86 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -32,8 +32,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.HashFunction; import org.elasticsearch.cluster.routing.DjbHashFunction; +import org.elasticsearch.cluster.routing.HashFunction; import org.elasticsearch.cluster.routing.SimpleHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -54,7 +54,9 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.*; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index be85ace475e17..74202a6bad1d0 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -282,7 +282,7 @@ public String indexUUID() { return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); } - public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException { + public synchronized IndexShard createShard(int sShardId, boolean primary) throws ElasticsearchException { /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just @@ -304,15 +304,17 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticsearchExc indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings); logger.debug("creating shard_id {}", shardId); - + // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. + final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false || + (primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); ModulesBuilder modules = new ModulesBuilder(); modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); - modules.add(new IndexShardModule(shardId, indexSettings)); + modules.add(new IndexShardModule(shardId, primary, indexSettings)); modules.add(new ShardIndexingModule()); modules.add(new ShardSearchModule()); modules.add(new ShardGetModule()); modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock, - new StoreCloseListener(shardId))); + new StoreCloseListener(shardId, canDeleteShardContent))); modules.add(new DeletionPolicyModule(indexSettings)); modules.add(new MergePolicyModule(indexSettings)); modules.add(new MergeSchedulerModule(indexSettings)); @@ -430,10 +432,12 @@ private void closeInjectorResource(ShardId shardId, Injector shardInjector, Clas } } - private void onShardClose(ShardLock lock) { + private void onShardClose(ShardLock lock, boolean ownsShard) { if (deleted.get()) { // we remove that shards content if this index has been deleted try { - indicesServices.deleteShardStore("delete index", lock, indexSettings); + if (ownsShard) { + indicesServices.deleteShardStore("delete index", lock, indexSettings); + } } catch (IOException e) { logger.warn("{} failed to delete shard content", e, lock.getShardId()); } @@ -442,15 +446,17 @@ private void onShardClose(ShardLock lock) { private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; + private final boolean ownsShard; - public StoreCloseListener(ShardId shardId) { + public StoreCloseListener(ShardId shardId, boolean ownsShard) { this.shardId = shardId; + this.ownsShard = ownsShard; } @Override public void handle(ShardLock lock) { assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId(); - onShardClose(lock); + onShardClose(lock, ownsShard); } } diff --git a/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java b/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java index 5c975789408e4..02f5658c40357 100644 --- a/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java +++ b/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java @@ -54,8 +54,7 @@ public String[] getFiles() { } /** - * Releases the current snapshot, returning true if it was - * actually released. + * Releases the current snapshot. */ public void close() { deletionPolicy.close(getGeneration()); diff --git a/src/main/java/org/elasticsearch/index/engine/EngineFactory.java b/src/main/java/org/elasticsearch/index/engine/EngineFactory.java index 89425dc46826d..ab5cfc5e6a916 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineFactory.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineFactory.java @@ -23,5 +23,7 @@ */ public interface EngineFactory { - public Engine newEngine(EngineConfig config); + public Engine newReadWriteEngine(EngineConfig config); + + public Engine newReadOnlyEngine(EngineConfig config); } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java b/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java index 0af65d08671ab..3857e9391c24e 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java @@ -20,7 +20,12 @@ public class InternalEngineFactory implements EngineFactory { @Override - public Engine newEngine(EngineConfig config) { + public Engine newReadWriteEngine(EngineConfig config) { return new InternalEngine(config); } + + @Override + public Engine newReadOnlyEngine(EngineConfig config) { + return new ShadowEngine(config); + } } diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java new file mode 100644 index 0000000000000..0fd4accf166e2 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.SearcherFactory; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * ShadowEngine is a specialized engine that only allows read-only operations + * on the underlying Lucene index. An {@code IndexReader} is opened instead of + * an {@code IndexWriter}. All methods that would usually perform write + * operations are no-ops, this means: + * + * - No operations are written to or read from the translog + * - Create, Index, and Delete do nothing + * - Flush does not fsync any files, or make any on-disk changes + * + * In order for new segments to become visible, the ShadowEngine may perform + * stage1 of the traditional recovery process (copying segment files) from a + * regular primary (which uses {@link org.elasticsearch.index.engine.InternalEngine}) + * + * Notice that since this Engine does not deal with the translog, any + * {@link #get(Get get)} request goes directly to the searcher, meaning it is + * non-realtime. + */ +public class ShadowEngine extends Engine { + + private volatile SearcherManager searcherManager; + + private SegmentInfos lastCommittedSegmentInfos; + + public ShadowEngine(EngineConfig engineConfig) { + super(engineConfig); + SearcherFactory searcherFactory = new EngineSearcherFactory(engineConfig); + try { + DirectoryReader reader = null; + store.incRef(); + boolean success = false; + try { + reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(store.directory()), shardId); + this.searcherManager = new SearcherManager(reader, searcherFactory); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + success = true; + } catch (Throwable e) { + logger.warn("failed to create new reader", e); + throw e; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(reader); + store.decRef(); + } + } + } catch (IOException ex) { + throw new EngineCreationFailureException(shardId, "failed to open index reader", ex); + } + } + + + @Override + public void create(Create create) throws EngineException { + throw new UnsupportedOperationException(shardId + " create operation not allowed on shadow engine"); + } + + @Override + public void index(Index index) throws EngineException { + throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine"); + } + + @Override + public void delete(Delete delete) throws EngineException { + throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine"); + } + + @Override + public void delete(DeleteByQuery delete) throws EngineException { + throw new UnsupportedOperationException(shardId + " delete-by-query operation not allowed on shadow engine"); + } + + @Override + public void flush() throws EngineException { + flush(false, false); + } + + @Override + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + logger.trace("skipping FLUSH on shadow engine"); + // reread the last committed segment infos + refresh("flush"); + try (ReleasableLock _ = readLock.acquire()) { + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + } catch (Throwable e) { + if (isClosed.get() == false) { + logger.warn("failed to read latest segment infos on flush", e); + if (Lucene.isCorruptionException(e)) { + throw new FlushFailedEngineException(shardId, e); + } + } + } + } + + @Override + public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException { + // no-op + logger.trace("skipping FORCE-MERGE on shadow engine"); + } + + @Override + public GetResult get(Get get) throws EngineException { + // There is no translog, so we can get it directly from the searcher + return getFromSearcher(get); + } + + @Override + public List segments(boolean verbose) { + try (ReleasableLock _ = readLock.acquire()) { + Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose); + for (int i = 0; i < segmentsArr.length; i++) { + // hard code all segments as committed, because they are in + // order for the shadow replica to see them + segmentsArr[i].committed = true; + } + return Arrays.asList(segmentsArr); + } + } + + @Override + public void refresh(String source) throws EngineException { + // we obtain a read lock here, since we don't want a flush to happen while we are refreshing + // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) + try (ReleasableLock _ = readLock.acquire()) { + ensureOpen(); + searcherManager.maybeRefreshBlocking(); + } catch (AlreadyClosedException e) { + ensureOpen(); + } catch (EngineClosedException e) { + throw e; + } catch (Throwable t) { + failEngine("refresh failed", t); + throw new RefreshFailedEngineException(shardId, t); + } + } + + @Override + public SnapshotIndexCommit snapshotIndex() throws EngineException { + throw new UnsupportedOperationException("Can not take snapshot from a shadow engine"); + } + + @Override + public void recover(RecoveryHandler recoveryHandler) throws EngineException { + throw new UnsupportedOperationException("Can not recover from a shadow engine"); + } + + @Override + protected SearcherManager getSearcherManager() { + return searcherManager; + } + + @Override + protected void closeNoLock(String reason) throws ElasticsearchException { + if (isClosed.compareAndSet(false, true)) { + try { + logger.debug("shadow replica close searcher manager refCount: {}", store.refCount()); + IOUtils.close(searcherManager); + } catch (Throwable t) { + logger.warn("shadow replica failed to close searcher manager", t); + } finally { + store.decRef(); + } + } + } +} diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 825d472c89437..8b405b903c76b 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -34,6 +34,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -44,7 +45,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -54,7 +54,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.aliases.IndexAliasesService; -import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; import org.elasticsearch.index.cache.filter.FilterCacheStats; @@ -89,9 +88,9 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreStats; -import org.elasticsearch.index.store.Store.MetadataSnapshot; import org.elasticsearch.index.suggest.stats.ShardSuggestService; import org.elasticsearch.index.suggest.stats.SuggestStats; import org.elasticsearch.index.termvectors.ShardTermVectorsService; @@ -131,7 +130,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final InternalIndicesLifecycle indicesLifecycle; private final Store store; private final MergeSchedulerProvider mergeScheduler; - private final AtomicReference currentEngineReference = new AtomicReference<>(); private final Translog translog; private final IndexAliasesService indexAliasesService; private final ShardIndexingService indexingService; @@ -152,16 +150,17 @@ public class IndexShard extends AbstractIndexShardComponent { private final Object mutex = new Object(); private final String checkIndexOnStartup; - private final EngineConfig config; - private final EngineFactory engineFactory; private long checkIndexTook = 0; - private volatile IndexShardState state; private TimeValue refreshInterval; private volatile ScheduledFuture refreshScheduledFuture; private volatile ScheduledFuture mergeScheduleFuture; - private volatile ShardRouting shardRouting; + protected volatile ShardRouting shardRouting; + protected volatile IndexShardState state; + protected final AtomicReference currentEngineReference = new AtomicReference<>(); + protected final EngineConfig config; + protected final EngineFactory engineFactory; @Nullable private RecoveryState recoveryState; @@ -180,7 +179,7 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache, - @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, AnalysisService analysisService, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) { + @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) { super(shardId, indexSettings); Preconditions.checkNotNull(store, "Store must be provided to the index shard"); Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard"); @@ -866,7 +865,7 @@ private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexSha } } - private void verifyStartedOrRecovering() throws IllegalIndexShardStateException { + protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering"); @@ -880,7 +879,7 @@ private void verifyNotClosed() throws IllegalIndexShardStateException { } } - private void verifyStarted() throws IllegalIndexShardStateException { + protected final void verifyStarted() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state != IndexShardState.STARTED) { throw new IndexShardNotStartedException(shardId, state); @@ -1142,13 +1141,13 @@ public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable f } } - private void createNewEngine() { + protected void createNewEngine() { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new EngineClosedException(shardId); } assert this.currentEngineReference.get() == null; - this.currentEngineReference.set(engineFactory.newEngine(config)); + this.currentEngineReference.set(engineFactory.newReadWriteEngine(config)); } } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 0932d12eb4379..2ba09533eae3d 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.shard; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.EngineFactory; @@ -26,27 +27,43 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService; /** - * + * The {@code IndexShardModule} module is responsible for binding the correct + * shard id, index shard, engine factory, and warming service for a newly + * created shard. */ public class IndexShardModule extends AbstractModule { public static final String ENGINE_FACTORY = "index.engine.factory"; private static final Class DEFAULT_ENGINE_FACTORY_CLASS = InternalEngineFactory.class; + private static final String ENGINE_PREFIX = "org.elasticsearch.index.engine."; + private static final String ENGINE_SUFFIX = "EngineFactory"; + private final ShardId shardId; private final Settings settings; + private final boolean primary; - public IndexShardModule(ShardId shardId, Settings settings) { + public IndexShardModule(ShardId shardId, boolean primary, Settings settings) { this.settings = settings; this.shardId = shardId; + this.primary = primary; + } + + /** Return true if a shadow engine should be used */ + protected boolean useShadowEngine() { + return primary == false && IndexMetaData.isIndexUsingShadowReplicas(settings); } @Override protected void configure() { bind(ShardId.class).toInstance(shardId); - bind(IndexShard.class).asEagerSingleton(); - bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, - "org.elasticsearch.index.engine.", "EngineFactory")); + if (useShadowEngine()) { + bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton(); + } else { + bind(IndexShard.class).asEagerSingleton(); + } + + bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, ENGINE_PREFIX, ENGINE_SUFFIX)); bind(ShardIndexWarmerService.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java new file mode 100644 index 0000000000000..45f5ed3ac49da --- /dev/null +++ b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.aliases.IndexAliasesService; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; +import org.elasticsearch.index.cache.filter.ShardFilterCache; +import org.elasticsearch.index.cache.query.ShardQueryCache; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.engine.EngineClosedException; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.fielddata.IndexFieldDataService; +import org.elasticsearch.index.fielddata.ShardFieldData; +import org.elasticsearch.index.get.ShardGetService; +import org.elasticsearch.index.indexing.ShardIndexingService; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.merge.policy.MergePolicyProvider; +import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; +import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; +import org.elasticsearch.index.percolator.stats.ShardPercolateService; +import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.search.stats.ShardSearchService; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.suggest.stats.ShardSuggestService; +import org.elasticsearch.index.termvectors.ShardTermVectorsService; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.warmer.ShardIndexWarmerService; +import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesWarmer; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * ShadowIndexShard extends {@link IndexShard} to add file synchronization + * from the primary when a flush happens. It also ensures that a replica being + * promoted to a primary causes the shard to fail, kicking off a re-allocation + * of the primary shard. + */ +public class ShadowIndexShard extends IndexShard { + + private final Object mutex = new Object(); + + @Inject + public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, + IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, + Translog translog, ThreadPool threadPool, MapperService mapperService, + IndexQueryParserService queryParserService, IndexCache indexCache, + IndexAliasesService indexAliasesService, ShardIndexingService indexingService, + ShardGetService getService, ShardSearchService searchService, + ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, + ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, + ShardPercolateService shardPercolateService, CodecService codecService, + ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, + IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, + ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer, + SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, + MergePolicyProvider mergePolicyProvider, EngineFactory factory) { + super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler, + translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, + indexingService, getService, searchService, shardWarmerService, shardFilterCache, + shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService, + termVectorsService, indexFieldDataService, indexService, shardSuggestService, + shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService, + mergePolicyProvider, factory); + } + + /** + * In addition to the regular accounting done in + * {@link IndexShard#routingEntry(org.elasticsearch.cluster.routing.ShardRouting)}, + * if this shadow replica needs to be promoted to a primary, the shard is + * failed in order to allow a new primary to be re-allocated. + */ + @Override + public IndexShard routingEntry(ShardRouting newRouting) { + ShardRouting shardRouting = this.routingEntry(); + super.routingEntry(newRouting); + // check for a shadow replica that now needs to be transformed into + // a normal primary today we simply fail it to force reallocation + if (shardRouting != null && shardRouting.primary() == false && // currently a replica + newRouting.primary() == true) {// becoming a primary + failShard("can't promote shadow replica to primary", + new ElasticsearchIllegalStateException("can't promote shadow replica to primary")); + } + return this; + } + + @Override + protected void createNewEngine() { + synchronized (mutex) { + if (state == IndexShardState.CLOSED) { + throw new EngineClosedException(shardId); + } + assert this.currentEngineReference.get() == null; + assert this.shardRouting.primary() == false; + // Use the read-only engine for shadow replicas + this.currentEngineReference.set(engineFactory.newReadOnlyEngine(config)); + } + } +} diff --git a/src/main/java/org/elasticsearch/index/store/DirectoryUtils.java b/src/main/java/org/elasticsearch/index/store/DirectoryUtils.java index d06872a4dd202..ffa69c76ec78a 100644 --- a/src/main/java/org/elasticsearch/index/store/DirectoryUtils.java +++ b/src/main/java/org/elasticsearch/index/store/DirectoryUtils.java @@ -31,10 +31,13 @@ public final class DirectoryUtils { private DirectoryUtils() {} // no instance - static final Directory getLeafDirectory(FilterDirectory dir) { + static final Directory getLeafDirectory(FilterDirectory dir, Class targetClass) { Directory current = dir.getDelegate(); while (true) { if ((current instanceof FilterDirectory)) { + if (targetClass != null && targetClass.isAssignableFrom(current.getClass())) { + break; + } current = ((FilterDirectory) current).getDelegate(); } else { break; @@ -59,7 +62,7 @@ public static T getLeaf(Directory dir, Class targetClas public static T getLeaf(Directory dir, Class targetClass, T defaultValue) { Directory d = dir; if (dir instanceof FilterDirectory) { - d = getLeafDirectory((FilterDirectory) dir); + d = getLeafDirectory((FilterDirectory) dir, targetClass); } if (d instanceof FileSwitchDirectory) { T leaf = getLeaf(((FileSwitchDirectory) d).getPrimaryDir(), targetClass); diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index a3b1df54db76b..f0061984b5171 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -481,7 +481,9 @@ private void deleteIndexStore(String reason, Index index, Settings indexSettings // the store metadata gets wiped anyway even without the lock this is just best effort since // every shards deletes its content under the shard lock it owns. logger.debug("{} deleting index store reason [{}]", index, reason); - nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); + if (canDeleteIndexContents(index, indexSettings)) { + nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); + } } catch (LockObtainFailedException ex) { logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index); } catch (Exception ex) { @@ -502,9 +504,6 @@ private void deleteIndexStore(String reason, Index index, Settings indexSettings */ public void deleteShardStore(String reason, ShardLock lock, Settings indexSettings) throws IOException { ShardId shardId = lock.getShardId(); - if (canDeleteShardContent(shardId, indexSettings) == false) { - throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId); - } logger.trace("{} deleting shard reason [{}]", shardId, reason); nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings); } @@ -527,6 +526,26 @@ public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaD logger.trace("{} deleting shard reason [{}]", shardId, reason); } + /** + * This method returns true if the current node is allowed to delete the + * given index. If the index uses a shared filesystem this method always + * returns false. + * @param index {@code Index} to check whether deletion is allowed + * @param indexSettings {@code Settings} for the given index + * @return true if the index can be deleted on this node + */ + public boolean canDeleteIndexContents(Index index, Settings indexSettings) { + final Tuple indexServiceInjectorTuple = this.indices.get(index); + if (IndexMetaData.isOnSharedFilesystem(indexSettings) == false) { + if (indexServiceInjectorTuple == null && nodeEnv.hasNodeFile()) { + return true; + } + } else { + logger.trace("{} skipping index directory deletion due to shadow replicas", index); + } + return false; + } + /** * Returns true iff the shards content for the given shard can be deleted. * This method will return false if: @@ -550,13 +569,16 @@ public boolean canDeleteShardContent(ShardId shardId, IndexMetaData metaData) { private boolean canDeleteShardContent(ShardId shardId, @IndexSettings Settings indexSettings) { final Tuple indexServiceInjectorTuple = this.indices.get(shardId.getIndex()); - // TODO add some protection here to prevent shard deletion if we are on a shard FS or have ShadowReplicas enabled. - if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) { - final IndexService indexService = indexServiceInjectorTuple.v1(); - return indexService.hasShard(shardId.id()) == false; - } else if (nodeEnv.hasNodeFile()) { - final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings); - return FileSystemUtils.exists(shardLocations); + if (IndexMetaData.isOnSharedFilesystem(indexSettings) == false) { + if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) { + final IndexService indexService = indexServiceInjectorTuple.v1(); + return indexService.hasShard(shardId.id()) == false; + } else if (nodeEnv.hasNodeFile()) { + final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings); + return FileSystemUtils.exists(shardLocations); + } + } else { + logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId); } return false; } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 0c4a0ac5620bb..b8972a6b82e57 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -686,7 +686,7 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco if (logger.isDebugEnabled()) { logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId); } - IndexShard indexShard = indexService.createShard(shardId); + IndexShard indexShard = indexService.createShard(shardId, shardRouting.primary()); indexShard.routingEntry(shardRouting); indexShard.addFailedEngineListener(failedEngineHandler); } catch (IndexShardAlreadyExistsException e) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index e6af0d9c42dce..9ab36b9915e98 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -22,6 +22,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -113,8 +114,12 @@ private RecoveryResponse recover(final StartRecoveryRequest request) { } logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); - - final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + final ShardRecoveryHandler handler; + if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) { + handler = new SharedFSRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + } else { + handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + } ongoingRecoveries.add(shard, handler); try { shard.recover(handler); diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index 26f4fd4b39de2..a67dd3199f6d8 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -76,9 +76,9 @@ * everything relating to copying the segment files as well as sending translog * operations across the wire once the segments have been copied. */ -public final class ShardRecoveryHandler implements Engine.RecoveryHandler { +public class ShardRecoveryHandler implements Engine.RecoveryHandler { - private final ESLogger logger; + protected final ESLogger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; private final String indexName; @@ -471,11 +471,12 @@ public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException { throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode()); StopWatch stopWatch = new StopWatch().start(); + final int totalOperations; + logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode()); // Send the translog operations to the target node - int totalOperations = sendSnapshot(snapshot); + totalOperations = sendSnapshot(snapshot); cancellableThreads.execute(new Interruptable() { @Override @@ -579,7 +580,7 @@ public void run() throws InterruptedException { * * @return the total number of translog operations that were sent */ - private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException { + protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException { int ops = 0; long size = 0; int totalOperations = 0; diff --git a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoveryHandler.java new file mode 100644 index 0000000000000..fbadb276eae56 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoveryHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +/** + * A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked + * as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock. + */ +public class SharedFSRecoveryHandler extends ShardRecoveryHandler { + + private final IndexShard shard; + private final StartRecoveryRequest request; + + public SharedFSRecoveryHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) { + super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); + this.shard = shard; + this.request = request; + } + + @Override + public void phase1(SnapshotIndexCommit snapshot) throws ElasticsearchException { + if (request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary()) { + // here we simply fail the primary shard since we can't move them (have 2 writers open at the same time) + // by failing the shard we play safe and just go through the entire reallocation procedure of the primary + // it would be ideal to make sure we flushed the translog here but that is not possible in the current design. + ElasticsearchIllegalStateException exception = new ElasticsearchIllegalStateException("Can't relocate primary - failing"); + shard.failShard("primary_relocation", exception); + throw exception; + } + logger.trace("{} recovery [phase2] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode()); + } + + + @Override + protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException { + logger.trace("{} recovery [phase3] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode()); + return 0; + } + +} diff --git a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java new file mode 100644 index 0000000000000..3850ac0f5d2ee --- /dev/null +++ b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java @@ -0,0 +1,373 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.List; + +import static com.google.common.collect.Lists.newArrayList; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +/** + * Tests for indices that use shadow replicas and a shared filesystem + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { + + @Test + public void testIndexWithFewDocuments() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + internalCluster().startNodesAsync(3, nodeSettings).get(); + final String IDX = "test"; + final Path dataPath = newTempDirPath(); + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureGreen(IDX); + + // So basically, the primary should fail and the replica will need to + // replay the translog, this is what this tests + client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); + + // Check that we can get doc 1 and 2, because we are doing realtime + // gets and getting from the primary + GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(true).setFields("foo").get(); + GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setRealtime(true).setFields("foo").get(); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + flushAndRefresh(IDX); + client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get(); + refresh(); + + // Check that we can get doc 1 and 2 without realtime + gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(false).setFields("foo").get(); + gResp2 = client().prepareGet(IDX, "doc", "2").setRealtime(false).setFields("foo").get(); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + logger.info("--> restarting both nodes"); + if (randomBoolean()) { + logger.info("--> rolling restart"); + internalCluster().rollingRestart(); + } else { + logger.info("--> full restart"); + internalCluster().fullRestart(); + } + + client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + ensureGreen(IDX); + + logger.info("--> performing query"); + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, 4); + + logger.info("--> deleting index"); + assertAcked(client().admin().indices().prepareDelete(IDX)); + } + + @Test + public void testReplicaToPrimaryPromotion() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + String node1 = internalCluster().startNode(nodeSettings); + Path dataPath = newTempDirPath(); + String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureYellow(IDX); + client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); + + GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get(); + GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get(); + assertTrue(gResp1.isExists()); + assertTrue(gResp2.isExists()); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + // Node1 has the primary, now node2 has the replica + String node2 = internalCluster().startNode(nodeSettings); + ensureGreen(IDX); + client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + flushAndRefresh(IDX); + + logger.info("--> stopping node1 [{}]", node1); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); + ensureYellow(IDX); + + logger.info("--> performing query"); + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, 2); + + gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get(); + gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get(); + assertTrue(gResp1.isExists()); + assertTrue(gResp2.toString(), gResp2.isExists()); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + } + + @Test + public void testPrimaryRelocation() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + String node1 = internalCluster().startNode(nodeSettings); + Path dataPath = newTempDirPath(); + String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureYellow(IDX); + client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); + + GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get(); + GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get(); + assertTrue(gResp1.isExists()); + assertTrue(gResp2.isExists()); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + // Node1 has the primary, now node2 has the replica + String node2 = internalCluster().startNode(nodeSettings); + ensureGreen(IDX); + client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + flushAndRefresh(IDX); + + // now prevent primary from being allocated on node 1 move to node_3 + String node3 = internalCluster().startNode(nodeSettings); + Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build(); + client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet(); + + ensureGreen(IDX); + logger.info("--> performing query"); + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, 2); + + gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get(); + gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get(); + assertTrue(gResp1.isExists()); + assertTrue(gResp2.toString(), gResp2.isExists()); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get(); + gResp1 = client().prepareGet(IDX, "doc", "3").setPreference("_primary").setFields("foo").get(); + gResp2 = client().prepareGet(IDX, "doc", "4").setPreference("_primary").setFields("foo").get(); + assertTrue(gResp1.isExists()); + assertTrue(gResp2.isExists()); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + } + + @Test + public void testIndexWithShadowReplicasCleansUp() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + int nodeCount = randomIntBetween(2, 5); + internalCluster().startNodesAsync(nodeCount, nodeSettings).get(); + Path dataPath = newTempDirPath(); + String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, nodeCount - 1)) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureGreen(IDX); + client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); + flushAndRefresh(IDX); + + GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get(); + GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get(); + assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar")); + assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); + + logger.info("--> performing query"); + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, 2); + + assertAcked(client().admin().indices().prepareDelete(IDX)); + + assertPathHasBeenCleared(dataPath); + } + + /** + * Tests that shadow replicas can be "naturally" rebalanced and relocated + * around the cluster. By "naturally" I mean without using the reroute API + * @throws Exception + */ + @Test + public void testShadowReplicaNaturalRelocation() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + internalCluster().startNodesAsync(2, nodeSettings).get(); + Path dataPath = newTempDirPath(); + String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 10) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureGreen(IDX); + + int docCount = randomIntBetween(10, 100); + List builders = newArrayList(); + for (int i = 0; i < docCount; i++) { + builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar")); + } + indexRandom(true, true, true, builders); + flushAndRefresh(IDX); + + // start a third node, with 10 shards each on the other nodes, they + // should relocate some to the third node + final String node3 = internalCluster().startNode(nodeSettings); + + assertBusy(new Runnable() { + @Override + public void run() { + client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes) { + logger.info("--> node has {} shards", node.numberOfOwningShards()); + assertThat("at least 5 shards on node", node.numberOfOwningShards(), greaterThanOrEqualTo(5)); + } + } + }); + ensureGreen(IDX); + + logger.info("--> performing query"); + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, docCount); + + assertAcked(client().admin().indices().prepareDelete(IDX)); + + assertPathHasBeenCleared(dataPath); + } + + @Test + public void testShadowReplicasUsingFieldData() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + internalCluster().startNodesAsync(3, nodeSettings).get(); + Path dataPath = newTempDirPath(); + String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string,index=not_analyzed").get(); + ensureGreen(IDX); + + client().prepareIndex(IDX, "doc", "1").setSource("foo", "foo").get(); + client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); + client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get(); + client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get(); + flushAndRefresh(IDX); + + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addFieldDataField("foo").addSort("foo", SortOrder.ASC).get(); + assertHitCount(resp, 4); + assertOrderedSearchHits(resp, "2", "3", "4", "1"); + SearchHit[] hits = resp.getHits().hits(); + assertThat(hits[0].field("foo").getValue().toString(), equalTo("bar")); + assertThat(hits[1].field("foo").getValue().toString(), equalTo("baz")); + assertThat(hits[2].field("foo").getValue().toString(), equalTo("eggplant")); + assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo")); + } +} diff --git a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java new file mode 100644 index 0000000000000..b4be7436d0897 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -0,0 +1,919 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import com.carrotsearch.randomizedtesting.LifecycleScope; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.*; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; +import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.indexing.ShardIndexingService; +import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.internal.SourceFieldMapper; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; +import org.elasticsearch.index.merge.policy.MergePolicyProvider; +import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; +import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; +import org.elasticsearch.index.settings.IndexSettingsService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardUtils; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.DirectoryUtils; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.distributor.LeastUsedDistributor; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.fs.FsTranslog; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ElasticsearchLuceneTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.*; +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; +import static org.elasticsearch.test.ElasticsearchTestCase.newTempDirPath; +import static org.elasticsearch.test.ElasticsearchTestCase.terminate; +import static org.hamcrest.Matchers.*; + +/** + * TODO: document me! + */ +public class ShadowEngineTests extends ElasticsearchLuceneTestCase { + + protected final ShardId shardId = new ShardId(new Index("index"), 1); + + protected ThreadPool threadPool; + + private Store store; + private Store storeReplica; + + protected Translog translog; + protected Translog replicaTranslog; + + protected Engine primaryEngine; + protected Engine replicaEngine; + + private Settings defaultSettings; + private int indexConcurrency; + private String codecName; + private Path dirPath; + + @Before + public void setUp() throws Exception { + super.setUp(); + CodecService codecService = new CodecService(shardId.index()); + indexConcurrency = randomIntBetween(1, 20); + String name = Codec.getDefault().getName(); + if (Arrays.asList(codecService.availableCodecs()).contains(name)) { + // some codecs are read only so we only take the ones that we have in the service and randomly + // selected by lucene test case. + codecName = name; + } else { + codecName = "default"; + } + defaultSettings = ImmutableSettings.builder() + .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, randomBoolean()) + .put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, randomBoolean()) + .put(EngineConfig.INDEX_CODEC_SETTING, codecName) + .put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency) + .build(); // TODO randomize more settings + threadPool = new ThreadPool(getClass().getName()); + dirPath = newTempDirPath(LifecycleScope.TEST); + store = createStore(dirPath); + store.deleteContent(); + storeReplica = createStore(dirPath); + storeReplica.deleteContent(); + translog = createTranslog(); + primaryEngine = createInternalEngine(store, translog); + LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)primaryEngine).getCurrentIndexWriterConfig(); + + assertEquals(primaryEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + primaryEngine.config().setEnableGcDeletes(false); + } + + replicaTranslog = createTranslogReplica(); + replicaEngine = createShadowEngine(storeReplica, replicaTranslog); + + assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + replicaEngine.config().setEnableGcDeletes(false); + } + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + replicaEngine.close(); + storeReplica.close(); + + primaryEngine.close(); + store.close(); + terminate(threadPool); + } + + private ParseContext.Document testDocumentWithTextField() { + ParseContext.Document document = testDocument(); + document.add(new TextField("value", "test", Field.Store.YES)); + return document; + } + + private ParseContext.Document testDocument() { + return new ParseContext.Document(); + } + + + private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, ParseContext.Document document, BytesReference source, boolean mappingsModified) { + Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + document.add(uidField); + document.add(versionField); + return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingsModified); + } + + protected Store createStore(Path p) throws IOException { + return createStore(newMockFSDirectory(p)); + } + + protected Store createStore(final Directory directory) throws IOException { + final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) { + @Override + public Directory[] build() throws IOException { + return new Directory[]{ directory }; + } + + @Override + public long throttleTimeInNanos() { + return 0; + } + }; + return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); + } + + protected Translog createTranslog() throws IOException { + return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/")); + } + + protected Translog createTranslogReplica() throws IOException { + return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/")); + } + + protected IndexDeletionPolicy createIndexDeletionPolicy() { + return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS); + } + + protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { + return new SnapshotDeletionPolicy(createIndexDeletionPolicy()); + } + + protected MergePolicyProvider createMergePolicy() { + return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS)); + } + + protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) { + return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService); + } + + protected ShadowEngine createShadowEngine(Store store, Translog translog) { + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); + return createShadowEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); + } + + protected InternalEngine createInternalEngine(Store store, Translog translog) { + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); + return createInternalEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); + } + + protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { + return new ShadowEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider)); + } + + protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { + return new InternalEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider)); + } + + public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { + IndexWriterConfig iwc = newIndexWriterConfig(); + EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService + , null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider, + iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() { + @Override + public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { + // we don't need to notify anybody in this test + } + }); + + + return config; + } + + protected Term newUid(String id) { + return new Term("_uid", id); + } + + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); + protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); + protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); + + @Test + public void testSegments() throws Exception { + List segments = primaryEngine.segments(false); + assertThat(segments.isEmpty(), equalTo(true)); + assertThat(primaryEngine.segmentsStats().getCount(), equalTo(0l)); + assertThat(primaryEngine.segmentsStats().getMemoryInBytes(), equalTo(0l)); + final boolean defaultCompound = defaultSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true); + + // create a doc and refresh + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + + ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false); + primaryEngine.create(new Engine.Create(null, newUid("2"), doc2)); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(false); + assertThat(segments.size(), equalTo(1)); + SegmentsStats stats = primaryEngine.segmentsStats(); + assertThat(stats.getCount(), equalTo(1l)); + assertThat(stats.getTermsMemoryInBytes(), greaterThan(0l)); + assertThat(stats.getStoredFieldsMemoryInBytes(), greaterThan(0l)); + assertThat(stats.getTermVectorsMemoryInBytes(), equalTo(0l)); + assertThat(stats.getNormsMemoryInBytes(), greaterThan(0l)); + assertThat(stats.getDocValuesMemoryInBytes(), greaterThan(0l)); + assertThat(segments.get(0).isCommitted(), equalTo(false)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(2)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + assertThat(segments.get(0).ramTree, nullValue()); + + // Check that the replica sees nothing + segments = replicaEngine.segments(false); + assertThat(segments.size(), equalTo(0)); + stats = replicaEngine.segmentsStats(); + assertThat(stats.getCount(), equalTo(0l)); + assertThat(stats.getTermsMemoryInBytes(), equalTo(0l)); + assertThat(stats.getStoredFieldsMemoryInBytes(), equalTo(0l)); + assertThat(stats.getTermVectorsMemoryInBytes(), equalTo(0l)); + assertThat(stats.getNormsMemoryInBytes(), equalTo(0l)); + assertThat(stats.getDocValuesMemoryInBytes(), equalTo(0l)); + assertThat(segments.size(), equalTo(0)); + + // flush the primary engine + primaryEngine.flush(); + // refresh the replica + replicaEngine.refresh("tests"); + + // Check that the primary AND replica sees segments now + segments = primaryEngine.segments(false); + assertThat(segments.size(), equalTo(1)); + assertThat(primaryEngine.segmentsStats().getCount(), equalTo(1l)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(2)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + + segments = replicaEngine.segments(false); + assertThat(segments.size(), equalTo(1)); + assertThat(replicaEngine.segmentsStats().getCount(), equalTo(1l)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(2)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + + + primaryEngine.config().setCompoundOnFlush(false); + + ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); + primaryEngine.create(new Engine.Create(null, newUid("3"), doc3)); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(false); + assertThat(segments.size(), equalTo(2)); + assertThat(primaryEngine.segmentsStats().getCount(), equalTo(2l)); + assertThat(primaryEngine.segmentsStats().getTermsMemoryInBytes(), greaterThan(stats.getTermsMemoryInBytes())); + assertThat(primaryEngine.segmentsStats().getStoredFieldsMemoryInBytes(), greaterThan(stats.getStoredFieldsMemoryInBytes())); + assertThat(primaryEngine.segmentsStats().getTermVectorsMemoryInBytes(), equalTo(0l)); + assertThat(primaryEngine.segmentsStats().getNormsMemoryInBytes(), greaterThan(stats.getNormsMemoryInBytes())); + assertThat(primaryEngine.segmentsStats().getDocValuesMemoryInBytes(), greaterThan(stats.getDocValuesMemoryInBytes())); + assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(2)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + assertThat(segments.get(1).isCommitted(), equalTo(false)); + assertThat(segments.get(1).isSearch(), equalTo(true)); + assertThat(segments.get(1).getNumDocs(), equalTo(1)); + assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(1).isCompound(), equalTo(false)); + + // Make visible to shadow replica + primaryEngine.flush(); + replicaEngine.refresh("test"); + + segments = replicaEngine.segments(false); + assertThat(segments.size(), equalTo(2)); + assertThat(replicaEngine.segmentsStats().getCount(), equalTo(2l)); + assertThat(replicaEngine.segmentsStats().getTermsMemoryInBytes(), greaterThan(stats.getTermsMemoryInBytes())); + assertThat(replicaEngine.segmentsStats().getStoredFieldsMemoryInBytes(), greaterThan(stats.getStoredFieldsMemoryInBytes())); + assertThat(replicaEngine.segmentsStats().getTermVectorsMemoryInBytes(), equalTo(0l)); + assertThat(replicaEngine.segmentsStats().getNormsMemoryInBytes(), greaterThan(stats.getNormsMemoryInBytes())); + assertThat(replicaEngine.segmentsStats().getDocValuesMemoryInBytes(), greaterThan(stats.getDocValuesMemoryInBytes())); + assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(2)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + assertThat(segments.get(1).isCommitted(), equalTo(true)); + assertThat(segments.get(1).isSearch(), equalTo(true)); + assertThat(segments.get(1).getNumDocs(), equalTo(1)); + assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(1).isCompound(), equalTo(false)); + + primaryEngine.delete(new Engine.Delete("test", "1", newUid("1"))); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(false); + assertThat(segments.size(), equalTo(2)); + assertThat(primaryEngine.segmentsStats().getCount(), equalTo(2l)); + assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(1)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(1)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + assertThat(segments.get(1).isCommitted(), equalTo(true)); + assertThat(segments.get(1).isSearch(), equalTo(true)); + assertThat(segments.get(1).getNumDocs(), equalTo(1)); + assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(1).isCompound(), equalTo(false)); + + // Make visible to shadow replica + primaryEngine.flush(); + replicaEngine.refresh("test"); + + primaryEngine.config().setCompoundOnFlush(true); + ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); + primaryEngine.create(new Engine.Create(null, newUid("4"), doc4)); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(false); + assertThat(segments.size(), equalTo(3)); + assertThat(primaryEngine.segmentsStats().getCount(), equalTo(3l)); + assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true)); + assertThat(segments.get(0).isCommitted(), equalTo(true)); + assertThat(segments.get(0).isSearch(), equalTo(true)); + assertThat(segments.get(0).getNumDocs(), equalTo(1)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(1)); + assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); + + assertThat(segments.get(1).isCommitted(), equalTo(true)); + assertThat(segments.get(1).isSearch(), equalTo(true)); + assertThat(segments.get(1).getNumDocs(), equalTo(1)); + assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(1).isCompound(), equalTo(false)); + + assertThat(segments.get(2).isCommitted(), equalTo(false)); + assertThat(segments.get(2).isSearch(), equalTo(true)); + assertThat(segments.get(2).getNumDocs(), equalTo(1)); + assertThat(segments.get(2).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(2).isCompound(), equalTo(true)); + } + + @Test + public void testVerboseSegments() throws Exception { + List segments = primaryEngine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(true); + assertThat(segments.size(), equalTo(1)); + assertThat(segments.get(0).ramTree, notNullValue()); + + ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false); + primaryEngine.create(new Engine.Create(null, newUid("2"), doc2)); + primaryEngine.refresh("test"); + ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); + primaryEngine.create(new Engine.Create(null, newUid("3"), doc3)); + primaryEngine.refresh("test"); + + segments = primaryEngine.segments(true); + assertThat(segments.size(), equalTo(3)); + assertThat(segments.get(0).ramTree, notNullValue()); + assertThat(segments.get(1).ramTree, notNullValue()); + assertThat(segments.get(2).ramTree, notNullValue()); + + // Now make the changes visible to the replica + primaryEngine.flush(); + replicaEngine.refresh("test"); + + segments = replicaEngine.segments(true); + assertThat(segments.size(), equalTo(3)); + assertThat(segments.get(0).ramTree, notNullValue()); + assertThat(segments.get(1).ramTree, notNullValue()); + assertThat(segments.get(2).ramTree, notNullValue()); + + } + + @Test + public void testShadowEngineIgnoresWriteOperations() throws Exception { + // create a document + ParseContext.Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + try { + replicaEngine.create(new Engine.Create(null, newUid("1"), doc)); + fail("should have thrown an exception"); + } catch (UnsupportedOperationException e) {} + replicaEngine.refresh("test"); + + // its not there... + Engine.Searcher searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + Engine.GetResult getResult = replicaEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // index a document + document = testDocument(); + document.add(new TextField("value", "test1", Field.Store.YES)); + doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + try { + replicaEngine.index(new Engine.Index(null, newUid("1"), doc)); + fail("should have thrown an exception"); + } catch (UnsupportedOperationException e) {} + replicaEngine.refresh("test"); + + // its still not there... + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + getResult = replicaEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // Now, add a document to the primary so we can test shadow engine deletes + document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); + doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + primaryEngine.flush(); + replicaEngine.refresh("test"); + + // Now the replica can see it + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + + // And the replica can retrieve it + getResult = replicaEngine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + + // try to delete it on the replica + try { + replicaEngine.delete(new Engine.Delete("test", "1", newUid("1"))); + fail("should have thrown an exception"); + } catch (UnsupportedOperationException e) {} + replicaEngine.flush(); + replicaEngine.refresh("test"); + primaryEngine.refresh("test"); + + // it's still there! + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + getResult = replicaEngine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + + // it's still there on the primary also! + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + getResult = primaryEngine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + } + + @Test + public void testSimpleOperations() throws Exception { + Engine.Searcher searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + searchResult.close(); + + // create a document + ParseContext.Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + + // its not there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + + // not on the replica either... + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + + // but, we can still get it (in realtime) + Engine.GetResult getResult = primaryEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source().source.toBytesArray(), equalTo(B_1.toBytesArray())); + assertThat(getResult.docIdAndVersion(), nullValue()); + getResult.release(); + + // can't get it from the replica, because it's not in the translog for a shadow replica + getResult = replicaEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // but, not there non realtime + getResult = primaryEngine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + // refresh and it should be there + primaryEngine.refresh("test"); + + // now its there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + + // also in non realtime + getResult = primaryEngine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + + // still not in the replica because no flush + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + + // now do an update + document = testDocument(); + document.add(new TextField("value", "test1", Field.Store.YES)); + document.add(new Field(SourceFieldMapper.NAME, B_2.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); + doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_2, false); + primaryEngine.index(new Engine.Index(null, newUid("1"), doc)); + + // its not updated yet... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + + // but, we can still get it (in realtime) + getResult = primaryEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source().source.toBytesArray(), equalTo(B_2.toBytesArray())); + assertThat(getResult.docIdAndVersion(), nullValue()); + getResult.release(); + + // refresh and it should be updated + primaryEngine.refresh("test"); + + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); + searchResult.close(); + + // flush, now shadow replica should have the files + primaryEngine.flush(); + + // still not in the replica because the replica hasn't refreshed + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + + replicaEngine.refresh("test"); + + // the replica finally sees it because primary has flushed and replica refreshed + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); + searchResult.close(); + + // now delete + primaryEngine.delete(new Engine.Delete("test", "1", newUid("1"))); + + // its not deleted yet + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); + searchResult.close(); + + // but, get should not see it (in realtime) + getResult = primaryEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // refresh and it should be deleted + primaryEngine.refresh("test"); + + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + + // add it back + document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); + doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + + // its not there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + + // refresh and it should be there + primaryEngine.refresh("test"); + + // now its there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + + // now flush + primaryEngine.flush(); + + // and, verify get (in real time) + getResult = primaryEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source(), nullValue()); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + + // the replica should see it if we refresh too! + replicaEngine.refresh("test"); + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + getResult = replicaEngine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source(), nullValue()); + assertThat(getResult.docIdAndVersion(), notNullValue()); + getResult.release(); + + // make sure we can still work with the engine + // now do an update + document = testDocument(); + document.add(new TextField("value", "test1", Field.Store.YES)); + doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false); + primaryEngine.index(new Engine.Index(null, newUid("1"), doc)); + + // its not updated yet... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); + searchResult.close(); + + // refresh and it should be updated + primaryEngine.refresh("test"); + + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); + searchResult.close(); + + // Make visible to shadow replica + primaryEngine.flush(); + replicaEngine.refresh("test"); + + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); + searchResult.close(); + } + + @Test + public void testSearchResultRelease() throws Exception { + Engine.Searcher searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + searchResult.close(); + + // create a document + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + + // its not there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); + searchResult.close(); + + // flush & refresh and it should everywhere + primaryEngine.flush(); + primaryEngine.refresh("test"); + replicaEngine.refresh("test"); + + // now its there... + searchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + + searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + // don't release the replica search result yet... + + // delete, refresh and do a new search, it should not be there + primaryEngine.delete(new Engine.Delete("test", "1", newUid("1"))); + primaryEngine.flush(); + primaryEngine.refresh("test"); + replicaEngine.refresh("test"); + Engine.Searcher updateSearchResult = primaryEngine.acquireSearcher("test"); + MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + updateSearchResult.close(); + + // the non released replica search result should not see the deleted yet... + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + } + + @Test + public void testFailEngineOnCorruption() { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + primaryEngine.flush(); + MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class); + leaf.setRandomIOExceptionRate(1.0); + leaf.setRandomIOExceptionRateOnOpen(1.0); + try { + replicaEngine.refresh("foo"); + fail("exception expected"); + } catch (Exception ex) { + + } + try { + Engine.Searcher searchResult = replicaEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); + searchResult.close(); + fail("exception expected"); + } catch (EngineClosedException ex) { + // all is well + } + } + + @Test + public void testExtractShardId() { + try (Engine.Searcher test = replicaEngine.acquireSearcher("test")) { + ShardId shardId = ShardUtils.extractShardId(test.reader()); + assertNotNull(shardId); + assertEquals(shardId, replicaEngine.config().getShardId()); + } + } + + /** + * Random test that throws random exception and ensures all references are + * counted down / released and resources are closed. + */ + @Test + public void testFailStart() throws IOException { + // Need a commit point for this + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); + primaryEngine.create(new Engine.Create(null, newUid("1"), doc)); + primaryEngine.flush(); + + // this test fails if any reader, searcher or directory is not closed - MDW FTW + final int iters = scaledRandomIntBetween(10, 100); + for (int i = 0; i < iters; i++) { + MockDirectoryWrapper wrapper = newMockFSDirectory(dirPath); + wrapper.setFailOnOpenInput(randomBoolean()); + wrapper.setAllowRandomFileNotFoundException(randomBoolean()); + wrapper.setRandomIOExceptionRate(randomDouble()); + wrapper.setRandomIOExceptionRateOnOpen(randomDouble()); + try (Store store = createStore(wrapper)) { + int refCount = store.refCount(); + assertTrue("refCount: "+ store.refCount(), store.refCount() > 0); + Translog translog = createTranslog(); + ShadowEngine holder; + try { + holder = createShadowEngine(store, translog); + } catch (EngineCreationFailureException ex) { + assertEquals(store.refCount(), refCount); + continue; + } + holder.config().setFailEngineOnCorruption(true); + assertEquals(store.refCount(), refCount+1); + final int numStarts = scaledRandomIntBetween(1, 5); + for (int j = 0; j < numStarts; j++) { + try { + assertEquals(store.refCount(), refCount + 1); + holder.close(); + holder = createShadowEngine(store, translog); + holder.config().setFailEngineOnCorruption(true); + assertEquals(store.refCount(), refCount + 1); + } catch (EngineCreationFailureException ex) { + // all is fine + assertEquals(store.refCount(), refCount); + break; + } + } + translog.close(); + holder.close(); + assertEquals(store.refCount(), refCount); + } + } + } + + @Test + public void testSettings() { + CodecService codecService = new CodecService(shardId.index()); + assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(replicaEngine.config().getIndexConcurrency(), indexConcurrency); + } +} diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java new file mode 100644 index 0000000000000..dff682569a6d7 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardModuleTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +/** Unit test(s) for IndexShardModule */ +public class IndexShardModuleTests extends ElasticsearchTestCase { + + @Test + public void testDetermineShadowEngineShouldBeUsed() { + ShardId shardId = new ShardId("myindex", 0); + Settings regularSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + + Settings shadowSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .build(); + + IndexShardModule ism1 = new IndexShardModule(shardId, true, regularSettings); + IndexShardModule ism2 = new IndexShardModule(shardId, false, regularSettings); + IndexShardModule ism3 = new IndexShardModule(shardId, true, shadowSettings); + IndexShardModule ism4 = new IndexShardModule(shardId, false, shadowSettings); + + assertFalse("no shadow replicas for normal settings", ism1.useShadowEngine()); + assertFalse("no shadow replicas for normal settings", ism2.useShadowEngine()); + assertFalse("no shadow replicas for primary shard with shadow settings", ism3.useShadowEngine()); + assertTrue("shadow replicas for replica shards with shadow settings", ism4.useShadowEngine()); + } +} diff --git a/src/test/java/org/elasticsearch/index/store/DirectoryUtilsTest.java b/src/test/java/org/elasticsearch/index/store/DirectoryUtilsTest.java index 7f0302356395c..7a9ee113a15d9 100644 --- a/src/test/java/org/elasticsearch/index/store/DirectoryUtilsTest.java +++ b/src/test/java/org/elasticsearch/index/store/DirectoryUtilsTest.java @@ -43,7 +43,7 @@ public void testGetLeave() throws IOException { BaseDirectoryWrapper dir = newFSDirectory(file); FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(dir) {}, FSDirectory.class, null); assertThat(directory, notNullValue()); - assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); + assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null))); dir.close(); } @@ -51,7 +51,7 @@ public void testGetLeave() throws IOException { BaseDirectoryWrapper dir = newFSDirectory(file); FSDirectory directory = DirectoryUtils.getLeaf(dir, FSDirectory.class, null); assertThat(directory, notNullValue()); - assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); + assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null))); dir.close(); } @@ -60,7 +60,7 @@ public void testGetLeave() throws IOException { BaseDirectoryWrapper dir = newFSDirectory(file); FSDirectory directory = DirectoryUtils.getLeaf(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean()), FSDirectory.class, null); assertThat(directory, notNullValue()); - assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); + assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null))); dir.close(); } @@ -69,7 +69,7 @@ public void testGetLeave() throws IOException { BaseDirectoryWrapper dir = newFSDirectory(file); FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean())) {}, FSDirectory.class, null); assertThat(directory, notNullValue()); - assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); + assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null))); dir.close(); } diff --git a/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java b/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java index 9e981d6ce5238..8bf52cbce7068 100644 --- a/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java +++ b/src/test/java/org/elasticsearch/indices/IndicesCustomDataPathTests.java @@ -137,23 +137,4 @@ public void testIndexCreatedWithCustomPathAndTemplate() throws Exception { assertAcked(client().admin().indices().prepareDelete(INDEX)); assertPathHasBeenCleared(path); } - - private void assertPathHasBeenCleared(String path) throws Exception { - int count = 0; - StringBuilder sb = new StringBuilder(); - sb.append("["); - if (Files.exists(Paths.get(path))) { - try (DirectoryStream stream = Files.newDirectoryStream(Paths.get(path))) { - for (Path file : stream) { - if (Files.isRegularFile(file)) { - count++; - sb.append(file.toAbsolutePath().toString()); - sb.append("\n"); - } - } - } - } - sb.append("]"); - assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); - } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 9739f26d3ae3c..3b1ea2a094080 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -125,8 +125,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -1790,6 +1792,39 @@ public Set assertAllShardsOnNodes(String index, String... pattern) { return nodes; } + /** + * Asserts that there are no files in the specified path + */ + public void assertPathHasBeenCleared(String path) throws Exception { + assertPathHasBeenCleared(Paths.get(path)); + } + + /** + * Asserts that there are no files in the specified path + */ + public void assertPathHasBeenCleared(Path path) throws Exception { + logger.info("--> checking that [{}] has been cleared", path); + int count = 0; + StringBuilder sb = new StringBuilder(); + sb.append("["); + if (Files.exists(path)) { + try (DirectoryStream stream = Files.newDirectoryStream(path)) { + for (Path file : stream) { + logger.info("--> found file: [{}]", file.toAbsolutePath().toString()); + if (Files.isDirectory(file)) { + assertPathHasBeenCleared(file); + } else if (Files.isRegularFile(file)) { + count++; + sb.append(file.toAbsolutePath().toString()); + sb.append("\n"); + } + } + } + } + sb.append("]"); + assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); + } + protected static class NumShards { public final int numPrimaries; public final int numReplicas; diff --git a/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java b/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java new file mode 100644 index 0000000000000..b3d893c4362c2 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/engine/AssertingSearcher.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.engine; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Map; + +/** + * A searcher that asserts the IndexReader's refcount on close + */ +public class AssertingSearcher extends Engine.Searcher { + private final Engine.Searcher wrappedSearcher; + private final ShardId shardId; + private final IndexSearcher indexSearcher; + private RuntimeException firstReleaseStack; + private final Object lock = new Object(); + private final int initialRefCount; + private final ESLogger logger; + private final Map inFlightSearchers; + + public AssertingSearcher(IndexSearcher indexSearcher, Engine.Searcher wrappedSearcher, + ShardId shardId, Map inFlightSearchers, + ESLogger logger) { + super(wrappedSearcher.source(), indexSearcher); + // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher + // with a wrapped reader. + this.wrappedSearcher = wrappedSearcher; + this.logger = logger; + this.shardId = shardId; + initialRefCount = wrappedSearcher.reader().getRefCount(); + this.indexSearcher = indexSearcher; + assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; + this.inFlightSearchers = inFlightSearchers; + this.inFlightSearchers.put(this, new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]")); + } + + @Override + public String source() { + return wrappedSearcher.source(); + } + + @Override + public void close() throws ElasticsearchException { + RuntimeException remove = inFlightSearchers.remove(this); + synchronized (lock) { + // make sure we only get this once and store the stack of the first caller! + if (remove == null) { + assert firstReleaseStack != null; + AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]"); + error.initCause(firstReleaseStack); + throw error; + } else { + assert firstReleaseStack == null; + firstReleaseStack = new RuntimeException("Searcher Released first here, source [" + wrappedSearcher.source() + "]"); + } + } + final int refCount = wrappedSearcher.reader().getRefCount(); + // this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential + // problems. + assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]"; + try { + wrappedSearcher.close(); + } catch (RuntimeException ex) { + logger.debug("Failed to release searcher", ex); + throw ex; + } + } + + @Override + public IndexReader reader() { + return indexSearcher.getIndexReader(); + } + + @Override + public IndexSearcher searcher() { + return indexSearcher; + } + + public ShardId shardId() { + return shardId; + } +} diff --git a/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java b/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java index 57d0d8eca3742..82b69b2bf035a 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java +++ b/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java @@ -27,7 +27,12 @@ */ public final class MockEngineFactory implements EngineFactory { @Override - public Engine newEngine(EngineConfig config) { + public Engine newReadWriteEngine(EngineConfig config) { return new MockInternalEngine(config); } + + @Override + public Engine newReadOnlyEngine(EngineConfig config) { + return new MockShadowEngine(config); + } } diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index e9e1037dff7f5..0a657ab0ee935 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -30,7 +30,6 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchIntegrationTest; import java.io.IOException; @@ -83,8 +82,9 @@ public void close() throws IOException { } finally { if (logger.isTraceEnabled()) { // log debug if we have pending searchers - for (Map.Entry entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) { - logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId); + for (Map.Entry entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) { + logger.trace("Unreleased Searchers instance for shard [{}]", + entry.getValue(), entry.getKey().shardId()); } } } @@ -105,7 +105,8 @@ protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherMa // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will // be released later on. If we wrap an index reader here must not pass the wrapped version to the manager // on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here - return new AssertingSearcher(assertingIndexSearcher, super.newSearcher(source, searcher, manager), shardId); + return new AssertingSearcher(assertingIndexSearcher, + super.newSearcher(source, searcher, manager), shardId, INFLIGHT_ENGINE_SEARCHERS, logger); } private DirectoryReader wrapReader(DirectoryReader reader) { @@ -132,73 +133,6 @@ private DirectoryReader wrapReader(DirectoryReader reader) { return reader; } - public final class AssertingSearcher extends Searcher { - private final Searcher wrappedSearcher; - private final ShardId shardId; - private final IndexSearcher indexSearcher; - private RuntimeException firstReleaseStack; - private final Object lock = new Object(); - private final int initialRefCount; - - public AssertingSearcher(IndexSearcher indexSearcher, Searcher wrappedSearcher, ShardId shardId) { - super(wrappedSearcher.source(), indexSearcher); - // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher - // with a wrapped reader. - this.wrappedSearcher = wrappedSearcher; - this.shardId = shardId; - initialRefCount = wrappedSearcher.reader().getRefCount(); - this.indexSearcher = indexSearcher; - assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; - INFLIGHT_ENGINE_SEARCHERS.put(this, new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]")); - } - - @Override - public String source() { - return wrappedSearcher.source(); - } - - @Override - public void close() throws ElasticsearchException { - RuntimeException remove = INFLIGHT_ENGINE_SEARCHERS.remove(this); - synchronized (lock) { - // make sure we only get this once and store the stack of the first caller! - if (remove == null) { - assert firstReleaseStack != null; - AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]"); - error.initCause(firstReleaseStack); - throw error; - } else { - assert firstReleaseStack == null; - firstReleaseStack = new RuntimeException("Searcher Released first here, source [" + wrappedSearcher.source() + "]"); - } - } - final int refCount = wrappedSearcher.reader().getRefCount(); - // this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential - // problems. - assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]"; - try { - wrappedSearcher.close(); - } catch (RuntimeException ex) { - logger.debug("Failed to release searcher", ex); - throw ex; - } - } - - @Override - public IndexReader reader() { - return indexSearcher.getIndexReader(); - } - - @Override - public IndexSearcher searcher() { - return indexSearcher; - } - - public ShardId shardId() { - return shardId; - } - } - public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader { protected final SubReaderWrapper subReaderWrapper; diff --git a/src/test/java/org/elasticsearch/test/engine/MockShadowEngine.java b/src/test/java/org/elasticsearch/test/engine/MockShadowEngine.java new file mode 100644 index 0000000000000..f6597378ba8ff --- /dev/null +++ b/src/test/java/org/elasticsearch/test/engine/MockShadowEngine.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.engine; + +import org.apache.lucene.index.AssertingDirectoryReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.AssertingIndexSearcher; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherManager; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.ShadowEngine; +import org.elasticsearch.test.ElasticsearchIntegrationTest; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class MockShadowEngine extends ShadowEngine { + + private final MockInternalEngine.MockContext mockContext; + public static final ConcurrentMap INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>(); + + public MockShadowEngine(EngineConfig config) { + super(config); + Settings indexSettings = config.getIndexSettings(); + final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l); + Random random = new Random(seed); + final double ratio = indexSettings.getAsDouble(MockInternalEngine.WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow + Class wrapper = indexSettings.getAsClass(MockInternalEngine.READER_WRAPPER_TYPE, AssertingDirectoryReader.class); + boolean wrapReader = random.nextDouble() < ratio; + logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader); + mockContext = new MockInternalEngine.MockContext(random, wrapReader, wrapper, indexSettings); + } + + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (logger.isTraceEnabled()) { + // log debug if we have pending searchers + for (Map.Entry entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) { + logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId()); + } + } + } + } + + @Override + protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + + IndexReader reader = searcher.getIndexReader(); + IndexReader wrappedReader = reader; + if (reader instanceof DirectoryReader && mockContext.wrapReader) { + wrappedReader = wrapReader((DirectoryReader) reader); + } + // this executes basic query checks and asserts that weights are normalized only once etc. + final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader); + assertingIndexSearcher.setSimilarity(searcher.getSimilarity()); + // pass the original searcher to the super.newSearcher() method to make + // sure this is the searcher that will be released later on. If we wrap + // an index reader here must not pass the wrapped version to the manager + // on release otherwise the reader will be closed too early. - good + // news, stuff will fail all over the place if we don't get this + // right here + return new AssertingSearcher(assertingIndexSearcher, + super.newSearcher(source, searcher, manager), shardId, + INFLIGHT_ENGINE_SEARCHERS, logger); + } + + private DirectoryReader wrapReader(DirectoryReader reader) { + try { + Constructor[] constructors = mockContext.wrapper.getConstructors(); + Constructor nonRandom = null; + for (Constructor constructor : constructors) { + Class[] parameterTypes = constructor.getParameterTypes(); + if (parameterTypes.length > 0 && parameterTypes[0] == DirectoryReader.class) { + if (parameterTypes.length == 1) { + nonRandom = constructor; + } else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) { + + return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings); + } + } + } + if (nonRandom != null) { + return (DirectoryReader) nonRandom.newInstance(reader); + } + } catch (Exception e) { + throw new ElasticsearchException("Can not wrap reader", e); + } + return reader; + } + +} diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 05f9e73b7c93a..d7abf2a6205c8 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -23,7 +23,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; - import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.elasticsearch.ElasticsearchException; @@ -65,7 +64,9 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.test.engine.AssertingSearcher; import org.elasticsearch.test.engine.MockInternalEngine; +import org.elasticsearch.test.engine.MockShadowEngine; import org.elasticsearch.test.store.MockDirectoryHelper; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -643,20 +644,26 @@ public static void assertAllSearchersClosed() { try { if (awaitBusy(new Predicate() { public boolean apply(Object o) { - return MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty(); + return MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() && + MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty(); } }, 5, TimeUnit.SECONDS)) { return; } } catch (InterruptedException ex) { - if (MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) { + if (MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() && + MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) { return; } } try { RuntimeException ex = null; StringBuilder builder = new StringBuilder("Unclosed Searchers instance for shards: ["); - for (Map.Entry entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) { + for (Map.Entry entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) { + ex = entry.getValue(); + builder.append(entry.getKey().shardId()).append(","); + } + for (Map.Entry entry : MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) { ex = entry.getValue(); builder.append(entry.getKey().shardId()).append(","); } @@ -664,6 +671,7 @@ public boolean apply(Object o) { throw new RuntimeException(builder.toString(), ex); } finally { MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.clear(); + MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.clear(); } } diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 8d840ac9bcc62..205ed928e30ed 100644 --- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -26,12 +26,16 @@ import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.AbstractRandomizedTest; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.IndexShardState; @@ -81,13 +85,13 @@ public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) { if (indexShard != null && shardId.equals(sid)) { logger.info("Shard state before potentially flushing is {}", indexShard.state()); - if (validCheckIndexStates.contains(indexShard.state())) { - canRun = true; + if (validCheckIndexStates.contains(indexShard.state()) && indexShard.engine() instanceof InternalEngine) { // When the the internal engine closes we do a rollback, which removes uncommitted segments // By doing a commit flush we perform a Lucene commit, but don't clear the translog, // so that even in tests where don't flush we can check the integrity of the Lucene index - indexShard.engine().snapshotIndex(); // Keep translog for tests that rely on replaying it + Releasables.close(indexShard.engine().snapshotIndex()); // Keep translog for tests that rely on replaying it logger.info("flush finished in beforeIndexShardClosed"); + canRun = true; } } }